Compare commits
4 Commits
99db8e5cb0
...
b6ffc60331
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6ffc60331 | ||
|
|
4e5c1287a6 | ||
|
|
3258acaa77 | ||
|
|
0c9ada242a |
@@ -1,12 +1,15 @@
|
||||
// 心情统计服务 — 心情趋势与连续天数
|
||||
//
|
||||
// 性能优化 (8a-C01):
|
||||
// - mood_counts: SQL GROUP BY 替代全量加载 + Rust 迭代
|
||||
// - streak: 仅查 date 列 + DISTINCT + 时间窗口裁剪,避免加载所有日记字段
|
||||
|
||||
use chrono::{Duration, NaiveDate, Utc};
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
|
||||
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::dto::{Mood, MoodCount, MoodStatsResp};
|
||||
use crate::entity::journal_entry;
|
||||
use crate::error::DiaryResult;
|
||||
|
||||
/// 统计查询范围
|
||||
@@ -37,8 +40,8 @@ pub struct MoodStatsService;
|
||||
impl MoodStatsService {
|
||||
/// 获取心情统计
|
||||
///
|
||||
/// 统计指定时间范围内各心情出现次数、连续写日记天数、
|
||||
/// 最常用心情等数据。
|
||||
/// 使用 SQL GROUP BY 聚合,避免全量加载日记到内存。
|
||||
/// 性能: O(mood_types) 而非 O(total_journals)。
|
||||
pub async fn get_mood_stats(
|
||||
tenant_id: Uuid,
|
||||
user_id: Uuid,
|
||||
@@ -47,24 +50,34 @@ impl MoodStatsService {
|
||||
) -> DiaryResult<MoodStatsResp> {
|
||||
let since_date = (Utc::now() - Duration::days(period.days())).date_naive();
|
||||
|
||||
// 查询时间范围内的日记
|
||||
let journals = journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::AuthorId.eq(user_id))
|
||||
.filter(journal_entry::Column::Date.gte(since_date))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.all(db)
|
||||
.await?;
|
||||
// SQL GROUP BY — 一次查询获取所有心情计数(替代全量加载)
|
||||
let sql = r#"
|
||||
SELECT mood, COUNT(*) AS count
|
||||
FROM journal_entry
|
||||
WHERE tenant_id = $1
|
||||
AND author_id = $2
|
||||
AND date >= $3
|
||||
AND deleted_at IS NULL
|
||||
GROUP BY mood
|
||||
"#;
|
||||
|
||||
let total_journals = journals.len() as i32;
|
||||
let stmt = sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[tenant_id.into(), user_id.into(), since_date.into()],
|
||||
);
|
||||
|
||||
// 计算各心情出现次数
|
||||
let mut mood_counts_map: std::collections::HashMap<String, i32> =
|
||||
let rows = db.query_all(stmt).await?;
|
||||
|
||||
let mut mood_counts_map: std::collections::HashMap<String, i64> =
|
||||
std::collections::HashMap::new();
|
||||
for journal in &journals {
|
||||
*mood_counts_map
|
||||
.entry(journal.mood.clone())
|
||||
.or_insert(0) += 1;
|
||||
let mut total_journals: i64 = 0;
|
||||
|
||||
for row in rows {
|
||||
let mood: String = row.try_get_by_index::<String>(0)?;
|
||||
let count: i64 = row.try_get_by_index::<i64>(1)?;
|
||||
total_journals += count;
|
||||
mood_counts_map.insert(mood, count);
|
||||
}
|
||||
|
||||
let mood_counts: Vec<MoodCount> = mood_counts_map
|
||||
@@ -77,53 +90,64 @@ impl MoodStatsService {
|
||||
};
|
||||
MoodCount {
|
||||
mood: parse_mood(mood),
|
||||
count,
|
||||
count: count as i32,
|
||||
percentage,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// 查找最常用心情
|
||||
let dominant_mood = mood_counts
|
||||
.iter()
|
||||
.max_by_key(|mc| mc.count)
|
||||
.map(|mc| mc.mood.clone());
|
||||
|
||||
// 计算连续写日记天数
|
||||
let streak_days = Self::calculate_streak(tenant_id, user_id, db).await?;
|
||||
|
||||
Ok(MoodStatsResp {
|
||||
mood_counts,
|
||||
streak_days,
|
||||
total_journals,
|
||||
total_journals: total_journals as i32,
|
||||
dominant_mood,
|
||||
})
|
||||
}
|
||||
|
||||
/// 计算连续写日记天数
|
||||
///
|
||||
/// 从今天开始往前数,连续有日记记录的天数。
|
||||
/// 只查询 date 列 + DISTINCT,按日期降序,限定 366 天窗口。
|
||||
/// 避免全量加载所有日记字段到内存。
|
||||
async fn calculate_streak(
|
||||
tenant_id: Uuid,
|
||||
user_id: Uuid,
|
||||
db: &DatabaseConnection,
|
||||
) -> DiaryResult<i32> {
|
||||
let journals = journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::AuthorId.eq(user_id))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.all(db)
|
||||
.await?;
|
||||
let cutoff = (Utc::now() - Duration::days(366)).date_naive();
|
||||
|
||||
// 收集所有有日记的日期
|
||||
let mut dates: std::collections::HashSet<NaiveDate> =
|
||||
journals.into_iter().map(|j| j.date).collect();
|
||||
let sql = r#"
|
||||
SELECT DISTINCT date
|
||||
FROM journal_entry
|
||||
WHERE tenant_id = $1
|
||||
AND author_id = $2
|
||||
AND date >= $3
|
||||
AND deleted_at IS NULL
|
||||
ORDER BY date DESC
|
||||
"#;
|
||||
|
||||
let stmt = sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[tenant_id.into(), user_id.into(), cutoff.into()],
|
||||
);
|
||||
|
||||
let rows = db.query_all(stmt).await?;
|
||||
|
||||
let dates: std::collections::HashSet<NaiveDate> = rows
|
||||
.into_iter()
|
||||
.filter_map(|row| row.try_get_by_index::<NaiveDate>(0).ok())
|
||||
.collect();
|
||||
|
||||
let mut streak = 0i32;
|
||||
let mut check_date = Utc::now().date_naive();
|
||||
|
||||
// 从今天开始往前检查
|
||||
while dates.remove(&check_date) {
|
||||
while dates.contains(&check_date) {
|
||||
streak += 1;
|
||||
check_date -= Duration::days(1);
|
||||
}
|
||||
@@ -206,8 +230,7 @@ mod tests {
|
||||
|
||||
let mut streak = 0i32;
|
||||
let mut check_date = today;
|
||||
let mut mutable_dates = dates.clone();
|
||||
while mutable_dates.remove(&check_date) {
|
||||
while dates.contains(&check_date) {
|
||||
streak += 1;
|
||||
check_date -= Duration::days(1);
|
||||
}
|
||||
@@ -223,8 +246,7 @@ mod tests {
|
||||
|
||||
let mut streak = 0i32;
|
||||
let mut check_date = today;
|
||||
let mut mutable_dates = dates.clone();
|
||||
while mutable_dates.remove(&check_date) {
|
||||
while dates.contains(&check_date) {
|
||||
streak += 1;
|
||||
check_date -= Duration::days(1);
|
||||
}
|
||||
@@ -240,8 +262,7 @@ mod tests {
|
||||
|
||||
let mut streak = 0i32;
|
||||
let mut check_date = today;
|
||||
let mut mutable_dates = dates.clone();
|
||||
while mutable_dates.remove(&check_date) {
|
||||
while dates.contains(&check_date) {
|
||||
streak += 1;
|
||||
check_date -= Duration::days(1);
|
||||
}
|
||||
@@ -258,8 +279,7 @@ mod tests {
|
||||
|
||||
let mut streak = 0i32;
|
||||
let mut check_date = today;
|
||||
let mut mutable_dates = dates.clone();
|
||||
while mutable_dates.remove(&check_date) {
|
||||
while dates.contains(&check_date) {
|
||||
streak += 1;
|
||||
check_date -= Duration::days(1);
|
||||
}
|
||||
@@ -271,9 +291,9 @@ mod tests {
|
||||
#[test]
|
||||
fn mood_counts_percentage_calculation() {
|
||||
// 模拟聚合逻辑:3 happy + 2 calm = 5 total
|
||||
let total = 5i32;
|
||||
let happy_count = 3i32;
|
||||
let calm_count = 2i32;
|
||||
let total = 5i64;
|
||||
let happy_count = 3i64;
|
||||
let calm_count = 2i64;
|
||||
|
||||
let happy_pct = (happy_count as f64 / total as f64) * 100.0;
|
||||
let calm_pct = (calm_count as f64 / total as f64) * 100.0;
|
||||
@@ -285,8 +305,38 @@ mod tests {
|
||||
#[test]
|
||||
fn mood_counts_empty_total_zero_percentage() {
|
||||
// 无日记时,百分比为 0
|
||||
let total = 0i32;
|
||||
let total = 0i64;
|
||||
let percentage = if total > 0 { 100.0 } else { 0.0 };
|
||||
assert_eq!(percentage, 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mood_counts_map_aggregation() {
|
||||
// 模拟 SQL GROUP BY 结果在 Rust 中构建 MoodCount 列表
|
||||
let mut map: std::collections::HashMap<String, i64> = std::collections::HashMap::new();
|
||||
*map.entry("happy".to_string()).or_insert(0) += 3;
|
||||
*map.entry("calm".to_string()).or_insert(0) += 2;
|
||||
*map.entry("happy".to_string()).or_insert(0) += 1;
|
||||
|
||||
let total: i64 = map.values().sum();
|
||||
assert_eq!(total, 6);
|
||||
assert_eq!(*map.get("happy").unwrap(), 4);
|
||||
assert_eq!(*map.get("calm").unwrap(), 2);
|
||||
|
||||
let counts: Vec<MoodCount> = map
|
||||
.iter()
|
||||
.map(|(mood, &count)| {
|
||||
let percentage = (count as f64 / total as f64) * 100.0;
|
||||
MoodCount {
|
||||
mood: parse_mood(mood),
|
||||
count: count as i32,
|
||||
percentage,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(counts.len(), 2);
|
||||
let happy_mc = counts.iter().find(|c| matches!(c.mood, Mood::Happy)).unwrap();
|
||||
assert!((happy_mc.percentage - 66.6667).abs() < 0.01);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
use chrono::Utc;
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait,
|
||||
QueryFilter, QueryOrder, Set, TransactionTrait,
|
||||
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
|
||||
PaginatorTrait, QueryFilter, QueryOrder, Set,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -272,7 +272,8 @@ impl ParentService {
|
||||
|
||||
/// 删除孩子数据 — 软删除所有日记(PIPL 删除权)
|
||||
///
|
||||
/// 软删除孩子全部未删除的日记,逐条设置 deleted_at。
|
||||
/// 使用单条 SQL UPDATE 批量软删除,替代逐条更新。
|
||||
/// 性能: 1 次 SQL 替代 N 次 UPDATE。
|
||||
/// 发布 `diary.parent.data_deleted` 事件记录操作。
|
||||
pub async fn delete_child_data(
|
||||
tenant_id: Uuid,
|
||||
@@ -283,32 +284,33 @@ impl ParentService {
|
||||
) -> DiaryResult<usize> {
|
||||
Self::verify_binding(tenant_id, parent_id, child_id, db).await?;
|
||||
|
||||
let journals = journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::AuthorId.eq(child_id))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
let count = journals.len();
|
||||
let now = Utc::now();
|
||||
|
||||
// 事务:软删除所有日记(PIPL 删除权 — 原子操作,避免部分删除)
|
||||
db.transaction::<_, (), DiaryError>(|txn| {
|
||||
Box::pin(async move {
|
||||
for journal in journals {
|
||||
let current_version = journal.version;
|
||||
let mut active: journal_entry::ActiveModel = journal.into();
|
||||
active.deleted_at = Set(Some(now));
|
||||
active.updated_at = Set(now);
|
||||
active.updated_by = Set(parent_id);
|
||||
active.version = Set(current_version + 1);
|
||||
active.update(txn).await?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
// 批量软删除 — 单条 SQL 替代逐条 UPDATE(性能优化 8a-C03)
|
||||
let sql = r#"
|
||||
UPDATE journal_entries
|
||||
SET deleted_at = $1,
|
||||
updated_at = $1,
|
||||
updated_by = $2,
|
||||
version = version + 1
|
||||
WHERE tenant_id = $3
|
||||
AND author_id = $4
|
||||
AND deleted_at IS NULL
|
||||
"#;
|
||||
|
||||
let stmt = sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[
|
||||
now.into(),
|
||||
parent_id.into(),
|
||||
tenant_id.into(),
|
||||
child_id.into(),
|
||||
],
|
||||
);
|
||||
|
||||
let result = db.execute(stmt).await?;
|
||||
let count = result.rows_affected() as usize;
|
||||
|
||||
event_bus
|
||||
.publish(
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
// 贴纸服务 — 贴纸包与贴纸管理
|
||||
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait,
|
||||
QueryFilter, QueryOrder, Set,
|
||||
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
|
||||
PaginatorTrait, QueryFilter, QueryOrder, Set,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -16,7 +16,8 @@ pub struct StickerService;
|
||||
impl StickerService {
|
||||
/// 获取贴纸包列表
|
||||
///
|
||||
/// 返回所有可用的贴纸包,按分类和名称排序。
|
||||
/// 使用 SQL GROUP BY 批量获取贴纸计数,替代逐包 COUNT 查询。
|
||||
/// 性能: 2 次查询(packs + counts)替代 N+1 次。
|
||||
pub async fn list_sticker_packs(
|
||||
tenant_id: Uuid,
|
||||
category: Option<String>,
|
||||
@@ -36,24 +37,47 @@ impl StickerService {
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
let mut result = Vec::with_capacity(packs.len());
|
||||
for pack in packs {
|
||||
let sticker_count = sticker::Entity::find()
|
||||
.filter(sticker::Column::PackId.eq(pack.id))
|
||||
.filter(sticker::Column::DeletedAt.is_null())
|
||||
.count(db)
|
||||
.await? as i32;
|
||||
// 批量获取所有贴纸包的贴纸计数 — 单次 SQL GROUP BY(替代 N+1 查询)
|
||||
let pack_ids: Vec<Uuid> = packs.iter().map(|p| p.id).collect();
|
||||
let count_map: std::collections::HashMap<Uuid, i64> = if !pack_ids.is_empty() {
|
||||
let sql = r#"
|
||||
SELECT pack_id, COUNT(*) AS count
|
||||
FROM stickers
|
||||
WHERE pack_id = ANY($1)
|
||||
AND deleted_at IS NULL
|
||||
GROUP BY pack_id
|
||||
"#;
|
||||
|
||||
result.push(StickerPackResp {
|
||||
let stmt = sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DatabaseBackend::Postgres,
|
||||
sql,
|
||||
[pack_ids.into()],
|
||||
);
|
||||
|
||||
let rows = db.query_all(stmt).await?;
|
||||
rows.into_iter()
|
||||
.filter_map(|row| {
|
||||
let pack_id: Uuid = row.try_get_by_index::<Uuid>(0).ok()?;
|
||||
let count: i64 = row.try_get_by_index::<i64>(1).ok()?;
|
||||
Some((pack_id, count))
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
std::collections::HashMap::new()
|
||||
};
|
||||
|
||||
let result: Vec<StickerPackResp> = packs
|
||||
.into_iter()
|
||||
.map(|pack| StickerPackResp {
|
||||
id: pack.id,
|
||||
name: pack.name,
|
||||
description: pack.description,
|
||||
cover_image_url: pack.thumbnail_url,
|
||||
sticker_count,
|
||||
sticker_count: *count_map.get(&pack.id).unwrap_or(&0) as i32,
|
||||
is_free: pack.is_free,
|
||||
category: pack.category,
|
||||
});
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,17 @@
|
||||
// 日记同步服务 — 版本号冲突检测 + 增量同步
|
||||
//
|
||||
// 性能优化 (8a-C02):
|
||||
// - 批量预查询: UPDATE/DELETE 目标一次性 IN 查询,替代逐条 find
|
||||
// - 事务化: 所有写操作在单个事务内完成
|
||||
// - 冲突前置检测: 写入前从预取数据检测版本冲突
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set};
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DatabaseConnection, EntityTrait,
|
||||
QueryFilter, Set, TransactionTrait,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::dto::{ConflictInfo, SyncChange, SyncResp};
|
||||
@@ -14,11 +24,12 @@ pub struct SyncService;
|
||||
impl SyncService {
|
||||
/// 同步:客户端提交变更,服务端返回服务端变更 + 冲突
|
||||
///
|
||||
/// 流程:
|
||||
/// 1. 逐条处理客户端变更(create/update/delete)
|
||||
/// 2. 获取 last_sync_time 之后服务端更新的记录
|
||||
/// 3. 检测版本冲突
|
||||
/// 4. 返回 (server_changes, conflicts, sync_time)
|
||||
/// 优化流程:
|
||||
/// 1. 按 create/update/delete 分组
|
||||
/// 2. 批量预查询 update/delete 目标记录(单次 IN 查询)
|
||||
/// 3. 从预取数据检测版本冲突,过滤出有效操作
|
||||
/// 4. 单事务执行所有写操作
|
||||
/// 5. 获取服务端变更
|
||||
pub async fn sync(
|
||||
tenant_id: Uuid,
|
||||
user_id: Uuid,
|
||||
@@ -26,29 +37,107 @@ impl SyncService {
|
||||
client_changes: Vec<SyncChange>,
|
||||
db: &DatabaseConnection,
|
||||
) -> DiaryResult<SyncResp> {
|
||||
let mut conflicts = Vec::new();
|
||||
// 1. 分组: 按操作类型拆分
|
||||
let mut creates: Vec<serde_json::Value> = Vec::new();
|
||||
let mut updates: Vec<(Uuid, i32, serde_json::Value)> = Vec::new();
|
||||
let mut deletes: Vec<(Uuid, i32)> = Vec::new();
|
||||
|
||||
// 1. 处理客户端变更
|
||||
for change in client_changes {
|
||||
if let Err(e) = Self::apply_client_change(tenant_id, user_id, change, db).await {
|
||||
// 版本冲突收集到冲突列表,其他错误直接返回
|
||||
match e {
|
||||
DiaryError::VersionConflict {
|
||||
local,
|
||||
server,
|
||||
} => {
|
||||
conflicts.push(ConflictInfo {
|
||||
journal_id: Uuid::nil(), // ID 在 apply_client_change 内部处理
|
||||
local_version: local,
|
||||
server_version: server,
|
||||
});
|
||||
}
|
||||
_ => return Err(e),
|
||||
match change {
|
||||
SyncChange::CreateJournal { data } => creates.push(data),
|
||||
SyncChange::UpdateJournal { id, version, data } => {
|
||||
updates.push((id, version, data));
|
||||
}
|
||||
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 批量预查询: 一次性获取所有 update/delete 目标记录
|
||||
let ids_to_fetch: Vec<Uuid> = updates
|
||||
.iter()
|
||||
.map(|(id, _, _)| *id)
|
||||
.chain(deletes.iter().map(|(id, _)| *id))
|
||||
.collect();
|
||||
|
||||
let existing_map: HashMap<Uuid, journal_entry::Model> = if !ids_to_fetch.is_empty() {
|
||||
journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::Id.is_in(ids_to_fetch))
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.all(db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| (m.id, m))
|
||||
.collect()
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
// 3. 冲突前置检测: 从预取数据判断版本,分离冲突和有效操作
|
||||
let mut conflicts = Vec::new();
|
||||
let mut valid_updates: Vec<(journal_entry::Model, serde_json::Value)> = Vec::new();
|
||||
let mut valid_deletes: Vec<(journal_entry::Model, i32)> = Vec::new();
|
||||
|
||||
for (id, version, data) in updates {
|
||||
match existing_map.get(&id) {
|
||||
Some(existing) if existing.version == version => {
|
||||
valid_updates.push((existing.clone(), data));
|
||||
}
|
||||
Some(existing) => {
|
||||
conflicts.push(ConflictInfo {
|
||||
journal_id: id,
|
||||
local_version: version,
|
||||
server_version: existing.version,
|
||||
});
|
||||
}
|
||||
None => {
|
||||
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 获取服务端变更(last_sync_time 之后更新的)
|
||||
for (id, version) in deletes {
|
||||
match existing_map.get(&id) {
|
||||
Some(existing) if existing.version == version => {
|
||||
valid_deletes.push((existing.clone(), version));
|
||||
}
|
||||
Some(existing) => {
|
||||
conflicts.push(ConflictInfo {
|
||||
journal_id: id,
|
||||
local_version: version,
|
||||
server_version: existing.version,
|
||||
});
|
||||
}
|
||||
None => {
|
||||
return Err(DiaryError::NotFound(format!("日记 {} 不存在", id)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 单事务执行所有写操作
|
||||
db.transaction::<_, (), DiaryError>(|txn| {
|
||||
Box::pin(async move {
|
||||
// 批量插入 creates
|
||||
for data in creates {
|
||||
Self::insert_journal(tenant_id, user_id, data, txn).await?;
|
||||
}
|
||||
|
||||
// 批量更新 valid_updates
|
||||
for (existing, data) in valid_updates {
|
||||
Self::apply_update(user_id, existing, data, txn).await?;
|
||||
}
|
||||
|
||||
// 批量软删除 valid_deletes
|
||||
for (existing, version) in valid_deletes {
|
||||
Self::apply_delete(user_id, existing, version, txn).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
// 5. 获取服务端变更(last_sync_time 之后更新的)
|
||||
let mut condition = Condition::all()
|
||||
.add(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.add(journal_entry::Column::AuthorId.eq(user_id));
|
||||
@@ -62,7 +151,6 @@ impl SyncService {
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
// 3. 转换为 JSON 格式的服务端变更
|
||||
let server_changes: Vec<serde_json::Value> = server_records
|
||||
.iter()
|
||||
.map(|r| {
|
||||
@@ -82,7 +170,6 @@ impl SyncService {
|
||||
})
|
||||
.collect();
|
||||
|
||||
// 4. 返回同步结果
|
||||
Ok(SyncResp {
|
||||
server_changes,
|
||||
conflicts,
|
||||
@@ -90,147 +177,120 @@ impl SyncService {
|
||||
})
|
||||
}
|
||||
|
||||
/// 处理单条客户端变更
|
||||
async fn apply_client_change(
|
||||
/// 插入新日记(事务内)
|
||||
async fn insert_journal<C: ConnectionTrait>(
|
||||
tenant_id: Uuid,
|
||||
user_id: Uuid,
|
||||
change: SyncChange,
|
||||
db: &DatabaseConnection,
|
||||
data: serde_json::Value,
|
||||
db: &C,
|
||||
) -> DiaryResult<()> {
|
||||
use sea_orm::ActiveModelTrait;
|
||||
let id = data
|
||||
.get("id")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok())
|
||||
.unwrap_or_else(Uuid::now_v7);
|
||||
|
||||
match change {
|
||||
SyncChange::CreateJournal { data } => {
|
||||
// 客户端创建 — 直接插入
|
||||
let id = data
|
||||
.get("id")
|
||||
let title = data
|
||||
.get("title")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
let now = Utc::now();
|
||||
let model = journal_entry::ActiveModel {
|
||||
id: Set(id),
|
||||
tenant_id: Set(tenant_id),
|
||||
author_id: Set(user_id),
|
||||
class_id: Set(None),
|
||||
title: Set(title),
|
||||
date: Set(
|
||||
data.get("date")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok())
|
||||
.unwrap_or_else(Uuid::now_v7);
|
||||
|
||||
let title = data
|
||||
.get("title")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or_else(|| now.date_naive()),
|
||||
),
|
||||
mood: Set(
|
||||
data.get("mood")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
.unwrap_or("happy")
|
||||
.to_string(),
|
||||
),
|
||||
weather: Set(
|
||||
data.get("weather")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("sunny")
|
||||
.to_string(),
|
||||
),
|
||||
tags: Set(data.get("tags").cloned()),
|
||||
is_private: Set(
|
||||
data.get("is_private")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(true),
|
||||
),
|
||||
shared_to_class: Set(false),
|
||||
assigned_topic_id: Set(None),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(user_id),
|
||||
updated_by: Set(user_id),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
model.insert(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let model = journal_entry::ActiveModel {
|
||||
id: Set(id),
|
||||
tenant_id: Set(tenant_id),
|
||||
author_id: Set(user_id),
|
||||
class_id: Set(None),
|
||||
title: Set(title),
|
||||
date: Set(
|
||||
data.get("date")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or_else(|| now.date_naive()),
|
||||
),
|
||||
mood: Set(
|
||||
data.get("mood")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("happy")
|
||||
.to_string(),
|
||||
),
|
||||
weather: Set(
|
||||
data.get("weather")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("sunny")
|
||||
.to_string(),
|
||||
),
|
||||
tags: Set(data.get("tags").cloned()),
|
||||
is_private: Set(
|
||||
data.get("is_private")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(true),
|
||||
),
|
||||
shared_to_class: Set(false),
|
||||
assigned_topic_id: Set(None),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
created_by: Set(user_id),
|
||||
updated_by: Set(user_id),
|
||||
deleted_at: Set(None),
|
||||
version: Set(1),
|
||||
};
|
||||
model.insert(db).await?;
|
||||
}
|
||||
SyncChange::UpdateJournal {
|
||||
id,
|
||||
version,
|
||||
data,
|
||||
} => {
|
||||
// 客户端更新 — 带版本检查
|
||||
let existing = journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::Id.eq(id))
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
|
||||
/// 应用更新(事务内,版本已预验证)
|
||||
async fn apply_update<C: ConnectionTrait>(
|
||||
user_id: Uuid,
|
||||
existing: journal_entry::Model,
|
||||
data: serde_json::Value,
|
||||
db: &C,
|
||||
) -> DiaryResult<()> {
|
||||
let now = Utc::now();
|
||||
let current_version = existing.version;
|
||||
let mut active: journal_entry::ActiveModel = existing.into();
|
||||
|
||||
if existing.version != version {
|
||||
return Err(DiaryError::VersionConflict {
|
||||
local: version,
|
||||
server: existing.version,
|
||||
});
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let mut active: journal_entry::ActiveModel = existing.into();
|
||||
|
||||
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
|
||||
active.title = Set(title.to_string());
|
||||
}
|
||||
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
|
||||
active.mood = Set(mood.to_string());
|
||||
}
|
||||
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
|
||||
active.weather = Set(weather.to_string());
|
||||
}
|
||||
if let Some(tags) = data.get("tags").cloned() {
|
||||
active.tags = Set(Some(tags));
|
||||
}
|
||||
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
|
||||
active.is_private = Set(is_private);
|
||||
}
|
||||
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
|
||||
active.shared_to_class = Set(shared);
|
||||
}
|
||||
|
||||
active.updated_at = Set(now);
|
||||
active.updated_by = Set(user_id);
|
||||
active.version = Set(version + 1);
|
||||
active.update(db).await?;
|
||||
}
|
||||
SyncChange::DeleteJournal { id, version } => {
|
||||
// 客户端删除 — 软删除带版本检查
|
||||
let existing = journal_entry::Entity::find()
|
||||
.filter(journal_entry::Column::Id.eq(id))
|
||||
.filter(journal_entry::Column::TenantId.eq(tenant_id))
|
||||
.filter(journal_entry::Column::DeletedAt.is_null())
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| DiaryError::NotFound(format!("日记 {} 不存在", id)))?;
|
||||
|
||||
if existing.version != version {
|
||||
return Err(DiaryError::VersionConflict {
|
||||
local: version,
|
||||
server: existing.version,
|
||||
});
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let mut active: journal_entry::ActiveModel = existing.into();
|
||||
active.deleted_at = Set(Some(now));
|
||||
active.updated_at = Set(now);
|
||||
active.updated_by = Set(user_id);
|
||||
active.version = Set(version + 1);
|
||||
active.update(db).await?;
|
||||
}
|
||||
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
|
||||
active.title = Set(title.to_string());
|
||||
}
|
||||
if let Some(mood) = data.get("mood").and_then(|v| v.as_str()) {
|
||||
active.mood = Set(mood.to_string());
|
||||
}
|
||||
if let Some(weather) = data.get("weather").and_then(|v| v.as_str()) {
|
||||
active.weather = Set(weather.to_string());
|
||||
}
|
||||
if let Some(tags) = data.get("tags").cloned() {
|
||||
active.tags = Set(Some(tags));
|
||||
}
|
||||
if let Some(is_private) = data.get("is_private").and_then(|v| v.as_bool()) {
|
||||
active.is_private = Set(is_private);
|
||||
}
|
||||
if let Some(shared) = data.get("shared_to_class").and_then(|v| v.as_bool()) {
|
||||
active.shared_to_class = Set(shared);
|
||||
}
|
||||
|
||||
active.updated_at = Set(now);
|
||||
active.updated_by = Set(user_id);
|
||||
active.version = Set(current_version + 1);
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 应用软删除(事务内,版本已预验证)
|
||||
async fn apply_delete<C: ConnectionTrait>(
|
||||
user_id: Uuid,
|
||||
existing: journal_entry::Model,
|
||||
version: i32,
|
||||
db: &C,
|
||||
) -> DiaryResult<()> {
|
||||
let now = Utc::now();
|
||||
let mut active: journal_entry::ActiveModel = existing.into();
|
||||
active.deleted_at = Set(Some(now));
|
||||
active.updated_at = Set(now);
|
||||
active.updated_by = Set(user_id);
|
||||
active.version = Set(version + 1);
|
||||
active.update(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -276,7 +336,6 @@ mod tests {
|
||||
});
|
||||
let change = SyncChange::CreateJournal { data: data.clone() };
|
||||
|
||||
// 验证 match 可以正确提取 data
|
||||
match &change {
|
||||
SyncChange::CreateJournal { data } => {
|
||||
assert_eq!(data.get("title").unwrap().as_str().unwrap(), "我的日记");
|
||||
@@ -344,43 +403,40 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn conflict_collection_pattern_mimics_sync_logic() {
|
||||
// 模拟 sync 方法中 VersionConflict 收集到 conflicts 列表的行为
|
||||
// 模拟批量预查询后的冲突检测逻辑
|
||||
let mut conflicts: Vec<ConflictInfo> = Vec::new();
|
||||
|
||||
let errors: Vec<DiaryError> = vec![
|
||||
DiaryError::VersionConflict {
|
||||
local: 1,
|
||||
server: 3,
|
||||
},
|
||||
DiaryError::VersionConflict {
|
||||
local: 2,
|
||||
server: 5,
|
||||
},
|
||||
];
|
||||
// 模拟预取的 existing_map: id → (version)
|
||||
let id1 = uuid::Uuid::now_v7();
|
||||
let id2 = uuid::Uuid::now_v7();
|
||||
let existing: HashMap<Uuid, i32> = [(id1, 3), (id2, 5)].into_iter().collect();
|
||||
|
||||
for e in errors {
|
||||
match e {
|
||||
DiaryError::VersionConflict { local, server } => {
|
||||
// 客户端变更
|
||||
let updates: Vec<(Uuid, i32)> = vec![(id1, 1), (id2, 5)];
|
||||
|
||||
for (id, local_ver) in updates {
|
||||
match existing.get(&id) {
|
||||
Some(&server_ver) if server_ver == local_ver => {
|
||||
// 版本匹配 — 有效更新
|
||||
}
|
||||
Some(&server_ver) => {
|
||||
conflicts.push(ConflictInfo {
|
||||
journal_id: uuid::Uuid::nil(),
|
||||
local_version: local,
|
||||
server_version: server,
|
||||
journal_id: id,
|
||||
local_version: local_ver,
|
||||
server_version: server_ver,
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(conflicts.len(), 2);
|
||||
assert_eq!(conflicts.len(), 1);
|
||||
assert_eq!(conflicts[0].local_version, 1);
|
||||
assert_eq!(conflicts[0].server_version, 3);
|
||||
assert_eq!(conflicts[1].local_version, 2);
|
||||
assert_eq!(conflicts[1].server_version, 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_conflict_error_does_not_collect() {
|
||||
// 验证非 VersionConflict 错误不会被收集到 conflicts 列表
|
||||
let mut conflicts: Vec<ConflictInfo> = Vec::new();
|
||||
let error = DiaryError::NotFound("日记不存在".to_string());
|
||||
|
||||
@@ -392,11 +448,63 @@ mod tests {
|
||||
server_version: server,
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
// 其他错误不应收集
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
assert!(conflicts.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_partition_correctly_separates_types() {
|
||||
let changes = vec![
|
||||
SyncChange::CreateJournal {
|
||||
data: serde_json::json!({"title": "日记1"}),
|
||||
},
|
||||
SyncChange::UpdateJournal {
|
||||
id: uuid::Uuid::nil(),
|
||||
version: 1,
|
||||
data: serde_json::json!({"title": "更新"}),
|
||||
},
|
||||
SyncChange::DeleteJournal {
|
||||
id: uuid::Uuid::nil(),
|
||||
version: 2,
|
||||
},
|
||||
SyncChange::CreateJournal {
|
||||
data: serde_json::json!({"title": "日记2"}),
|
||||
},
|
||||
];
|
||||
|
||||
let mut creates = Vec::new();
|
||||
let mut updates = Vec::new();
|
||||
let mut deletes = Vec::new();
|
||||
|
||||
for change in changes {
|
||||
match change {
|
||||
SyncChange::CreateJournal { data } => creates.push(data),
|
||||
SyncChange::UpdateJournal { id, version, data } => {
|
||||
updates.push((id, version, data));
|
||||
}
|
||||
SyncChange::DeleteJournal { id, version } => deletes.push((id, version)),
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(creates.len(), 2);
|
||||
assert_eq!(updates.len(), 1);
|
||||
assert_eq!(deletes.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version_match_detection() {
|
||||
// 模拟预取数据中的版本匹配
|
||||
let id = uuid::Uuid::now_v7();
|
||||
let existing: HashMap<Uuid, i32> = [(id, 5)].into_iter().collect();
|
||||
|
||||
// 版本匹配
|
||||
let matched = existing.get(&id).map_or(false, |&sv| sv == 5);
|
||||
assert!(matched);
|
||||
|
||||
// 版本不匹配
|
||||
let mismatched = existing.get(&id).map_or(false, |&sv| sv == 3);
|
||||
assert!(!mismatched);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user