Compare commits

...

4 Commits

Author SHA1 Message Date
iven
b6ffc60331 perf(diary): sticker_service 批量 GROUP BY 替代 N+1 贴纸计数 — 8a-C04
Some checks failed
Main Merge / backend (push) Has been cancelled
Main Merge / frontend (push) Has been cancelled
- list_sticker_packs: 单次 SQL GROUP BY pack_id 获取所有计数
- 2 次查询(packs + counts)替代 N+1 次
- 使用 PostgreSQL ANY() 传递 UUID 数组
- 测试 80/80 通过
2026-06-03 15:51:05 +08:00
iven
4e5c1287a6 perf(diary): parent_service 批量软删除替代逐条 UPDATE — 8a-C03
- delete_child_data 改用单条 SQL UPDATE ... WHERE 批量软删除
- 1 次 SQL 替代 N 次逐条 UPDATE(从 O(N) 降到 O(1) 查询)
- 移除不再需要的 TransactionTrait 导入
- 测试 80/80 通过
2026-06-03 15:48:29 +08:00
iven
3258acaa77 perf(diary): sync_service 批量预查询 + 事务化 — 8a-C02
- 按操作类型分组 create/update/delete
- UPDATE/DELETE 目标单次 IN 查询替代逐条 find(消除 N+1)
- 冲突前置检测: 从预取 HashMap 判断版本,再过滤有效操作
- 所有写操作在单个事务内完成(原子化)
- 辅助函数改用 ConnectionTrait 泛型(兼容 DatabaseConnection 和 DatabaseTransaction)
- 测试 80/80 通过
2026-06-03 15:45:36 +08:00
iven
0c9ada242a perf(diary): mood_stats 改用 SQL GROUP BY 替代全量加载 — 8a-C01
- get_mood_stats: SELECT mood, COUNT(*) GROUP BY 替代 all() + Rust 迭代
- calculate_streak: 仅查 date 列 + DISTINCT + 366天窗口裁剪
- 新增 mood_counts_map_aggregation 单元测试
- 测试 78/78 通过
2026-06-03 15:37:09 +08:00
4 changed files with 455 additions and 271 deletions

View File

@@ -1,12 +1,15 @@
// 心情统计服务 — 心情趋势与连续天数
//
// 性能优化 (8a-C01):
// - mood_counts: SQL GROUP BY 替代全量加载 + Rust 迭代
// - streak: 仅查 date 列 + DISTINCT + 时间窗口裁剪,避免加载所有日记字段
use chrono::{Duration, NaiveDate, Utc};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use sea_orm::{ConnectionTrait, DatabaseConnection};
use serde::Deserialize;
use uuid::Uuid;
use crate::dto::{Mood, MoodCount, MoodStatsResp};
use crate::entity::journal_entry;
use crate::error::DiaryResult;
/// 统计查询范围
@@ -37,8 +40,8 @@ pub struct MoodStatsService;
impl MoodStatsService {
/// 获取心情统计
///
/// 统计指定时间范围内各心情出现次数、连续写日记天数、
/// 最常用心情等数据
/// 使用 SQL GROUP BY 聚合,避免全量加载日记到内存。
/// 性能: O(mood_types) 而非 O(total_journals)
pub async fn get_mood_stats(
tenant_id: Uuid,
user_id: Uuid,
@@ -47,24 +50,34 @@ impl MoodStatsService {
) -> DiaryResult<MoodStatsResp> {
let since_date = (Utc::now() - Duration::days(period.days())).date_naive();
// 查询时间范围内的日记
let journals = journal_entry::Entity::find()
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::AuthorId.eq(user_id))
.filter(journal_entry::Column::Date.gte(since_date))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?;
// SQL GROUP BY — 一次查询获取所有心情计数(替代全量加载)
let sql = r#"
SELECT mood, COUNT(*) AS count
FROM journal_entry
WHERE tenant_id = $1
AND author_id = $2
AND date >= $3
AND deleted_at IS NULL
GROUP BY mood
"#;
let total_journals = journals.len() as i32;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[tenant_id.into(), user_id.into(), since_date.into()],
);
// 计算各心情出现次数
let mut mood_counts_map: std::collections::HashMap<String, i32> =
let rows = db.query_all(stmt).await?;
let mut mood_counts_map: std::collections::HashMap<String, i64> =
std::collections::HashMap::new();
for journal in &journals {
*mood_counts_map
.entry(journal.mood.clone())
.or_insert(0) += 1;
let mut total_journals: i64 = 0;
for row in rows {
let mood: String = row.try_get_by_index::<String>(0)?;
let count: i64 = row.try_get_by_index::<i64>(1)?;
total_journals += count;
mood_counts_map.insert(mood, count);
}
let mood_counts: Vec<MoodCount> = mood_counts_map
@@ -77,53 +90,64 @@ impl MoodStatsService {
};
MoodCount {
mood: parse_mood(mood),
count,
count: count as i32,
percentage,
}
})
.collect();
// 查找最常用心情
let dominant_mood = mood_counts
.iter()
.max_by_key(|mc| mc.count)
.map(|mc| mc.mood.clone());
// 计算连续写日记天数
let streak_days = Self::calculate_streak(tenant_id, user_id, db).await?;
Ok(MoodStatsResp {
mood_counts,
streak_days,
total_journals,
total_journals: total_journals as i32,
dominant_mood,
})
}
/// 计算连续写日记天数
///
/// 从今天开始往前数,连续有日记记录的天数
/// 只查询 date 列 + DISTINCT按日期降序限定 366 天窗口
/// 避免全量加载所有日记字段到内存。
async fn calculate_streak(
tenant_id: Uuid,
user_id: Uuid,
db: &DatabaseConnection,
) -> DiaryResult<i32> {
let journals = journal_entry::Entity::find()
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::AuthorId.eq(user_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?;
let cutoff = (Utc::now() - Duration::days(366)).date_naive();
// 收集所有有日记的日期
let mut dates: std::collections::HashSet<NaiveDate> =
journals.into_iter().map(|j| j.date).collect();
let sql = r#"
SELECT DISTINCT date
FROM journal_entry
WHERE tenant_id = $1
AND author_id = $2
AND date >= $3
AND deleted_at IS NULL
ORDER BY date DESC
"#;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[tenant_id.into(), user_id.into(), cutoff.into()],
);
let rows = db.query_all(stmt).await?;
let dates: std::collections::HashSet<NaiveDate> = rows
.into_iter()
.filter_map(|row| row.try_get_by_index::<NaiveDate>(0).ok())
.collect();
let mut streak = 0i32;
let mut check_date = Utc::now().date_naive();
// 从今天开始往前检查
while dates.remove(&check_date) {
while dates.contains(&check_date) {
streak += 1;
check_date -= Duration::days(1);
}
@@ -206,8 +230,7 @@ mod tests {
let mut streak = 0i32;
let mut check_date = today;
let mut mutable_dates = dates.clone();
while mutable_dates.remove(&check_date) {
while dates.contains(&check_date) {
streak += 1;
check_date -= Duration::days(1);
}
@@ -223,8 +246,7 @@ mod tests {
let mut streak = 0i32;
let mut check_date = today;
let mut mutable_dates = dates.clone();
while mutable_dates.remove(&check_date) {
while dates.contains(&check_date) {
streak += 1;
check_date -= Duration::days(1);
}
@@ -240,8 +262,7 @@ mod tests {
let mut streak = 0i32;
let mut check_date = today;
let mut mutable_dates = dates.clone();
while mutable_dates.remove(&check_date) {
while dates.contains(&check_date) {
streak += 1;
check_date -= Duration::days(1);
}
@@ -258,8 +279,7 @@ mod tests {
let mut streak = 0i32;
let mut check_date = today;
let mut mutable_dates = dates.clone();
while mutable_dates.remove(&check_date) {
while dates.contains(&check_date) {
streak += 1;
check_date -= Duration::days(1);
}
@@ -271,9 +291,9 @@ mod tests {
#[test]
fn mood_counts_percentage_calculation() {
// 模拟聚合逻辑3 happy + 2 calm = 5 total
let total = 5i32;
let happy_count = 3i32;
let calm_count = 2i32;
let total = 5i64;
let happy_count = 3i64;
let calm_count = 2i64;
let happy_pct = (happy_count as f64 / total as f64) * 100.0;
let calm_pct = (calm_count as f64 / total as f64) * 100.0;
@@ -285,8 +305,38 @@ mod tests {
#[test]
fn mood_counts_empty_total_zero_percentage() {
// 无日记时,百分比为 0
let total = 0i32;
let total = 0i64;
let percentage = if total > 0 { 100.0 } else { 0.0 };
assert_eq!(percentage, 0.0);
}
#[test]
fn mood_counts_map_aggregation() {
// 模拟 SQL GROUP BY 结果在 Rust 中构建 MoodCount 列表
let mut map: std::collections::HashMap<String, i64> = std::collections::HashMap::new();
*map.entry("happy".to_string()).or_insert(0) += 3;
*map.entry("calm".to_string()).or_insert(0) += 2;
*map.entry("happy".to_string()).or_insert(0) += 1;
let total: i64 = map.values().sum();
assert_eq!(total, 6);
assert_eq!(*map.get("happy").unwrap(), 4);
assert_eq!(*map.get("calm").unwrap(), 2);
let counts: Vec<MoodCount> = map
.iter()
.map(|(mood, &count)| {
let percentage = (count as f64 / total as f64) * 100.0;
MoodCount {
mood: parse_mood(mood),
count: count as i32,
percentage,
}
})
.collect();
assert_eq!(counts.len(), 2);
let happy_mc = counts.iter().find(|c| matches!(c.mood, Mood::Happy)).unwrap();
assert!((happy_mc.percentage - 66.6667).abs() < 0.01);
}
}

View File

@@ -2,8 +2,8 @@
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait,
QueryFilter, QueryOrder, Set, TransactionTrait,
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use uuid::Uuid;
@@ -272,7 +272,8 @@ impl ParentService {
/// 删除孩子数据 — 软删除所有日记PIPL 删除权)
///
/// 软删除孩子全部未删除的日记,逐条设置 deleted_at
/// 使用单条 SQL UPDATE 批量软删除,替代逐条更新
/// 性能: 1 次 SQL 替代 N 次 UPDATE。
/// 发布 `diary.parent.data_deleted` 事件记录操作。
pub async fn delete_child_data(
tenant_id: Uuid,
@@ -283,32 +284,33 @@ impl ParentService {
) -> DiaryResult<usize> {
Self::verify_binding(tenant_id, parent_id, child_id, db).await?;
let journals = journal_entry::Entity::find()
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::AuthorId.eq(child_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?;
let count = journals.len();
let now = Utc::now();
// 事务软删除所有日记PIPL 删除权 — 原子操作,避免部分删除
db.transaction::<_, (), DiaryError>(|txn| {
Box::pin(async move {
for journal in journals {
let current_version = journal.version;
let mut active: journal_entry::ActiveModel = journal.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(parent_id);
active.version = Set(current_version + 1);
active.update(txn).await?;
}
Ok(())
})
})
.await?;
// 批量软删除 — 单条 SQL 替代逐条 UPDATE性能优化 8a-C03
let sql = r#"
UPDATE journal_entries
SET deleted_at = $1,
updated_at = $1,
updated_by = $2,
version = version + 1
WHERE tenant_id = $3
AND author_id = $4
AND deleted_at IS NULL
"#;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
now.into(),
parent_id.into(),
tenant_id.into(),
child_id.into(),
],
);
let result = db.execute(stmt).await?;
let count = result.rows_affected() as usize;
event_bus
.publish(

View File

@@ -1,8 +1,8 @@
// 贴纸服务 — 贴纸包与贴纸管理
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait,
QueryFilter, QueryOrder, Set,
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use uuid::Uuid;
@@ -16,7 +16,8 @@ pub struct StickerService;
impl StickerService {
/// 获取贴纸包列表
///
/// 返回所有可用的贴纸包,按分类和名称排序
/// 使用 SQL GROUP BY 批量获取贴纸计数,替代逐包 COUNT 查询
/// 性能: 2 次查询packs + counts替代 N+1 次。
pub async fn list_sticker_packs(
tenant_id: Uuid,
category: Option<String>,
@@ -36,24 +37,47 @@ impl StickerService {
.all(db)
.await?;
let mut result = Vec::with_capacity(packs.len());
for pack in packs {
let sticker_count = sticker::Entity::find()
.filter(sticker::Column::PackId.eq(pack.id))
.filter(sticker::Column::DeletedAt.is_null())
.count(db)
.await? as i32;
// 批量获取所有贴纸包的贴纸计数 — 单次 SQL GROUP BY替代 N+1 查询)
let pack_ids: Vec<Uuid> = packs.iter().map(|p| p.id).collect();
let count_map: std::collections::HashMap<Uuid, i64> = if !pack_ids.is_empty() {
let sql = r#"
SELECT pack_id, COUNT(*) AS count
FROM stickers
WHERE pack_id = ANY($1)
AND deleted_at IS NULL
GROUP BY pack_id
"#;
result.push(StickerPackResp {
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[pack_ids.into()],
);
let rows = db.query_all(stmt).await?;
rows.into_iter()
.filter_map(|row| {
let pack_id: Uuid = row.try_get_by_index::<Uuid>(0).ok()?;
let count: i64 = row.try_get_by_index::<i64>(1).ok()?;
Some((pack_id, count))
})
.collect()
} else {
std::collections::HashMap::new()
};
let result: Vec<StickerPackResp> = packs
.into_iter()
.map(|pack| StickerPackResp {
id: pack.id,
name: pack.name,
description: pack.description,
cover_image_url: pack.thumbnail_url,
sticker_count,
sticker_count: *count_map.get(&pack.id).unwrap_or(&0) as i32,
is_free: pack.is_free,
category: pack.category,
});
}
})
.collect();
Ok(result)
}

View File

@@ -1,7 +1,17 @@
// 日记同步服务 — 版本号冲突检测 + 增量同步
//
// 性能优化 (8a-C02):
// - 批量预查询: UPDATE/DELETE 目标一次性 IN 查询,替代逐条 find
// - 事务化: 所有写操作在单个事务内完成
// - 冲突前置检测: 写入前从预取数据检测版本冲突
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DatabaseConnection, EntityTrait,
QueryFilter, Set, TransactionTrait,
};
use uuid::Uuid;
use crate::dto::{ConflictInfo, SyncChange, SyncResp};
@@ -14,11 +24,12 @@ pub struct SyncService;
impl SyncService {
/// 同步:客户端提交变更,服务端返回服务端变更 + 冲突
///
/// 流程:
/// 1. 逐条处理客户端变更(create/update/delete
/// 2. 获取 last_sync_time 之后服务端更新的记录
/// 3. 检测版本冲突
/// 4. 返回 (server_changes, conflicts, sync_time)
/// 优化流程:
/// 1. create/update/delete 分组
/// 2. 批量预查询 update/delete 目标记录(单次 IN 查询)
/// 3. 从预取数据检测版本冲突,过滤出有效操作
/// 4. 单事务执行所有写操作
/// 5. 获取服务端变更
pub async fn sync(
tenant_id: Uuid,
user_id: Uuid,
@@ -26,29 +37,107 @@ impl SyncService {
client_changes: Vec<SyncChange>,
db: &DatabaseConnection,
) -> DiaryResult<SyncResp> {
let mut conflicts = Vec::new();
// 1. 分组: 按操作类型拆分
let mut creates: Vec<serde_json::Value> = Vec::new();
let mut updates: Vec<(Uuid, i32, serde_json::Value)> = Vec::new();
let mut deletes: Vec<(Uuid, i32)> = Vec::new();
// 1. 处理客户端变更
for change in client_changes {
if let Err(e) = Self::apply_client_change(tenant_id, user_id, change, db).await {
// 版本冲突收集到冲突列表,其他错误直接返回
match e {
DiaryError::VersionConflict {
local,
server,
} => {
conflicts.push(ConflictInfo {
journal_id: Uuid::nil(), // ID 在 apply_client_change 内部处理
local_version: local,
server_version: server,
});
}
_ => return Err(e),
match change {
SyncChange::CreateJournal { data } => creates.push(data),
SyncChange::UpdateJournal { id, version, data } => {
updates.push((id, version, data));
}
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
}
}
// 2. 批量预查询: 一次性获取所有 update/delete 目标记录
let ids_to_fetch: Vec<Uuid> = updates
.iter()
.map(|(id, _, _)| *id)
.chain(deletes.iter().map(|(id, _)| *id))
.collect();
let existing_map: HashMap<Uuid, journal_entry::Model> = if !ids_to_fetch.is_empty() {
journal_entry::Entity::find()
.filter(journal_entry::Column::Id.is_in(ids_to_fetch))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.all(db)
.await?
.into_iter()
.map(|m| (m.id, m))
.collect()
} else {
HashMap::new()
};
// 3. 冲突前置检测: 从预取数据判断版本,分离冲突和有效操作
let mut conflicts = Vec::new();
let mut valid_updates: Vec<(journal_entry::Model, serde_json::Value)> = Vec::new();
let mut valid_deletes: Vec<(journal_entry::Model, i32)> = Vec::new();
for (id, version, data) in updates {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_updates.push((existing.clone(), data));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
}
}
}
// 2. 获取服务端变更last_sync_time 之后更新的)
for (id, version) in deletes {
match existing_map.get(&id) {
Some(existing) if existing.version == version => {
valid_deletes.push((existing.clone(), version));
}
Some(existing) => {
conflicts.push(ConflictInfo {
journal_id: id,
local_version: version,
server_version: existing.version,
});
}
None => {
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
}
}
}
// 4. 单事务执行所有写操作
db.transaction::<_, (), DiaryError>(|txn| {
Box::pin(async move {
// 批量插入 creates
for data in creates {
Self::insert_journal(tenant_id, user_id, data, txn).await?;
}
// 批量更新 valid_updates
for (existing, data) in valid_updates {
Self::apply_update(user_id, existing, data, txn).await?;
}
// 批量软删除 valid_deletes
for (existing, version) in valid_deletes {
Self::apply_delete(user_id, existing, version, txn).await?;
}
Ok(())
})
})
.await?;
// 5. 获取服务端变更last_sync_time 之后更新的)
let mut condition = Condition::all()
.add(journal_entry::Column::TenantId.eq(tenant_id))
.add(journal_entry::Column::AuthorId.eq(user_id));
@@ -62,7 +151,6 @@ impl SyncService {
.all(db)
.await?;
// 3. 转换为 JSON 格式的服务端变更
let server_changes: Vec<serde_json::Value> = server_records
.iter()
.map(|r| {
@@ -82,7 +170,6 @@ impl SyncService {
})
.collect();
// 4. 返回同步结果
Ok(SyncResp {
server_changes,
conflicts,
@@ -90,147 +177,120 @@ impl SyncService {
})
}
/// 处理单条客户端变更
async fn apply_client_change(
/// 插入新日记(事务内)
async fn insert_journal<C: ConnectionTrait>(
tenant_id: Uuid,
user_id: Uuid,
change: SyncChange,
db: &DatabaseConnection,
data: serde_json::Value,
db: &C,
) -> DiaryResult<()> {
use sea_orm::ActiveModelTrait;
let id = data
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::now_v7);
match change {
SyncChange::CreateJournal { data } => {
// 客户端创建 — 直接插入
let id = data
.get("id")
let title = data
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let now = Utc::now();
let model = journal_entry::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
author_id: Set(user_id),
class_id: Set(None),
title: Set(title),
date: Set(
data.get("date")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or_else(Uuid::now_v7);
let title = data
.get("title")
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| now.date_naive()),
),
mood: Set(
data.get("mood")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
.unwrap_or("happy")
.to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
Ok(())
}
let now = Utc::now();
let model = journal_entry::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
author_id: Set(user_id),
class_id: Set(None),
title: Set(title),
date: Set(
data.get("date")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| now.date_naive()),
),
mood: Set(
data.get("mood")
.and_then(|v| v.as_str())
.unwrap_or("happy")
.to_string(),
),
weather: Set(
data.get("weather")
.and_then(|v| v.as_str())
.unwrap_or("sunny")
.to_string(),
),
tags: Set(data.get("tags").cloned()),
is_private: Set(
data.get("is_private")
.and_then(|v| v.as_bool())
.unwrap_or(true),
),
shared_to_class: Set(false),
assigned_topic_id: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(user_id),
updated_by: Set(user_id),
deleted_at: Set(None),
version: Set(1),
};
model.insert(db).await?;
}
SyncChange::UpdateJournal {
id,
version,
data,
} => {
// 客户端更新 — 带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
/// 应用更新(事务内,版本已预验证)
async fn apply_update<C: ConnectionTrait>(
user_id: Uuid,
existing: journal_entry::Model,
data: serde_json::Value,
db: &C,
) -> DiaryResult<()> {
let now = Utc::now();
let current_version = existing.version;
let mut active: journal_entry::ActiveModel = existing.into();
if existing.version != version {
return Err(DiaryError::VersionConflict {
local: version,
server: existing.version,
});
}
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
active.title = Set(title.to_string());
}
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
active.mood = Set(mood.to_string());
}
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
active.weather = Set(weather.to_string());
}
if let Some(tags) = data.get("tags").cloned() {
active.tags = Set(Some(tags));
}
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
active.is_private = Set(is_private);
}
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
active.shared_to_class = Set(shared);
}
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
SyncChange::DeleteJournal { id, version } => {
// 客户端删除 — 软删除带版本检查
let existing = journal_entry::Entity::find()
.filter(journal_entry::Column::Id.eq(id))
.filter(journal_entry::Column::TenantId.eq(tenant_id))
.filter(journal_entry::Column::DeletedAt.is_null())
.one(db)
.await?
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
if existing.version != version {
return Err(DiaryError::VersionConflict {
local: version,
server: existing.version,
});
}
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
}
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
active.title = Set(title.to_string());
}
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
active.mood = Set(mood.to_string());
}
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
active.weather = Set(weather.to_string());
}
if let Some(tags) = data.get("tags").cloned() {
active.tags = Set(Some(tags));
}
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
active.is_private = Set(is_private);
}
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
active.shared_to_class = Set(shared);
}
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(current_version + 1);
active.update(db).await?;
Ok(())
}
/// 应用软删除(事务内,版本已预验证)
async fn apply_delete<C: ConnectionTrait>(
user_id: Uuid,
existing: journal_entry::Model,
version: i32,
db: &C,
) -> DiaryResult<()> {
let now = Utc::now();
let mut active: journal_entry::ActiveModel = existing.into();
active.deleted_at = Set(Some(now));
active.updated_at = Set(now);
active.updated_by = Set(user_id);
active.version = Set(version + 1);
active.update(db).await?;
Ok(())
}
}
@@ -276,7 +336,6 @@ mod tests {
});
let change = SyncChange::CreateJournal { data: data.clone() };
// 验证 match 可以正确提取 data
match &change {
SyncChange::CreateJournal { data } => {
assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记");
@@ -344,43 +403,40 @@ mod tests {
#[test]
fn conflict_collection_pattern_mimics_sync_logic() {
// 模拟 sync 方法中 VersionConflict 收集到 conflicts 列表的行为
// 模拟批量预查询后的冲突检测逻辑
let mut conflicts: Vec<ConflictInfo> = Vec::new();
let errors: Vec<DiaryError> = vec![
DiaryError::VersionConflict {
local: 1,
server: 3,
},
DiaryError::VersionConflict {
local: 2,
server: 5,
},
];
// 模拟预取的 existing_map: id → (version)
let id1 = uuid::Uuid::now_v7();
let id2 = uuid::Uuid::now_v7();
let existing: HashMap<Uuid, i32> = [(id1, 3), (id2, 5)].into_iter().collect();
for e in errors {
match e {
DiaryError::VersionConflict { local, server } => {
// 客户端变更
let updates: Vec<(Uuid, i32)> = vec![(id1, 1), (id2, 5)];
for (id, local_ver) in updates {
match existing.get(&id) {
Some(&server_ver) if server_ver == local_ver => {
// 版本匹配 — 有效更新
}
Some(&server_ver) => {
conflicts.push(ConflictInfo {
journal_id: uuid::Uuid::nil(),
local_version: local,
server_version: server,
journal_id: id,
local_version: local_ver,
server_version: server_ver,
});
}
_ => {}
None => {}
}
}
assert_eq!(conflicts.len(), 2);
assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].local_version, 1);
assert_eq!(conflicts[0].server_version, 3);
assert_eq!(conflicts[1].local_version, 2);
assert_eq!(conflicts[1].server_version, 5);
}
#[test]
fn non_conflict_error_does_not_collect() {
// 验证非 VersionConflict 错误不会被收集到 conflicts 列表
let mut conflicts: Vec<ConflictInfo> = Vec::new();
let error = DiaryError::NotFound("日记不存在".to_string());
@@ -392,11 +448,63 @@ mod tests {
server_version: server,
});
}
_ => {
// 其他错误不应收集
}
_ => {}
}
assert!(conflicts.is_empty());
}
#[test]
fn batch_partition_correctly_separates_types() {
let changes = vec![
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记1"}),
},
SyncChange::UpdateJournal {
id: uuid::Uuid::nil(),
version: 1,
data: serde_json::json!({"title": "更新"}),
},
SyncChange::DeleteJournal {
id: uuid::Uuid::nil(),
version: 2,
},
SyncChange::CreateJournal {
data: serde_json::json!({"title": "日记2"}),
},
];
let mut creates = Vec::new();
let mut updates = Vec::new();
let mut deletes = Vec::new();
for change in changes {
match change {
SyncChange::CreateJournal { data } => creates.push(data),
SyncChange::UpdateJournal { id, version, data } => {
updates.push((id, version, data));
}
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
}
}
assert_eq!(creates.len(), 2);
assert_eq!(updates.len(), 1);
assert_eq!(deletes.len(), 1);
}
#[test]
fn version_match_detection() {
// 模拟预取数据中的版本匹配
let id = uuid::Uuid::now_v7();
let existing: HashMap<Uuid, i32> = [(id, 5)].into_iter().collect();
// 版本匹配
let matched = existing.get(&id).map_or(false, |&sv| sv == 5);
assert!(matched);
// 版本不匹配
let mismatched = existing.get(&id).map_or(false, |&sv| sv == 3);
assert!(!mismatched);
}
}