feat(app): 统一同步协议 — SyncModels + ApiClient.sync + SyncEngine.tryBatchSync
Some checks failed
Main Merge / backend (push) Has been cancelled
Main Merge / frontend (push) Has been cancelled

Flutter ↔ Rust 同步协议对齐:
- 新增 sync_models.dart: SyncReq/SyncResp/SyncChange/ConflictInfo
  与 Rust dto.rs 一一对应 (CreateJournal/UpdateJournal/DeleteJournal)
- ApiClient.sync(): 调用 POST /diary/sync 批量同步端点
- SyncEngine.tryBatchSync(): PendingOperation → SyncChange 批量提交
  成功清空队列,冲突保留待用户处理

保留原有逐个同步 trySync() 作为降级方案
后端 509/509 测试通过, Flutter analyze 0 error
This commit is contained in:
iven
2026-06-03 17:20:51 +08:00
parent 1766cefde9
commit 367f21de08
3 changed files with 267 additions and 0 deletions

View File

@@ -0,0 +1,176 @@
// 同步协议模型 — 与 Rust 端 SyncReq/SyncResp/SyncChange/ConflictInfo 一一对应
//
// 端点: POST /api/v1/diary/sync
// Rust DTO: crates/erp-diary/src/dto.rs (SyncReq, SyncResp, SyncChange, ConflictInfo)
/// 同步请求 — 与 Rust SyncReq 对应
///
/// ```rust
/// pub struct SyncReq {
/// pub last_sync_time: Option<DateTime<Utc>>,
/// pub changes: Vec<SyncChange>,
/// }
/// ```
class SyncReq {
final DateTime? lastSyncTime;
final List<SyncChange> changes;
const SyncReq({this.lastSyncTime, this.changes = const []});
Map<String, dynamic> toJson() => {
if (lastSyncTime != null)
'last_sync_time': lastSyncTime!.toUtc().toIso8601String(),
'changes': changes.map((c) => c.toJson()).toList(),
};
}
/// 同步变更条目 — 与 Rust SyncChange 枚举对应
///
/// ```rust
/// pub enum SyncChange {
/// CreateJournal { data: serde_json::Value },
/// UpdateJournal { id: Uuid, version: i32, data: serde_json::Value },
/// DeleteJournal { id: Uuid, version: i32 },
/// }
/// ```
sealed class SyncChange {
const SyncChange();
Map<String, dynamic> toJson();
/// 从 JSON 反序列化
factory SyncChange.fromJson(Map<String, dynamic> json) {
if (json.containsKey('CreateJournal')) {
return SyncChangeCreateJournal(
data: json['CreateJournal']['data'] as Map<String, dynamic>,
);
}
if (json.containsKey('UpdateJournal')) {
final inner = json['UpdateJournal'] as Map<String, dynamic>;
return SyncChangeUpdateJournal(
id: inner['id'] as String,
version: inner['version'] as int,
data: inner['data'] as Map<String, dynamic>,
);
}
if (json.containsKey('DeleteJournal')) {
final inner = json['DeleteJournal'] as Map<String, dynamic>;
return SyncChangeDeleteJournal(
id: inner['id'] as String,
version: inner['version'] as int,
);
}
throw FormatException('Unknown SyncChange variant: $json');
}
}
/// 创建日记变更
class SyncChangeCreateJournal extends SyncChange {
final Map<String, dynamic> data;
const SyncChangeCreateJournal({required this.data});
@override
Map<String, dynamic> toJson() => {
'CreateJournal': {'data': data},
};
}
/// 更新日记变更
class SyncChangeUpdateJournal extends SyncChange {
final String id;
final int version;
final Map<String, dynamic> data;
const SyncChangeUpdateJournal({
required this.id,
required this.version,
required this.data,
});
@override
Map<String, dynamic> toJson() => {
'UpdateJournal': {
'id': id,
'version': version,
'data': data,
},
};
}
/// 删除日记变更
class SyncChangeDeleteJournal extends SyncChange {
final String id;
final int version;
const SyncChangeDeleteJournal({
required this.id,
required this.version,
});
@override
Map<String, dynamic> toJson() => {
'DeleteJournal': {
'id': id,
'version': version,
},
};
}
/// 同步响应 — 与 Rust SyncResp 对应
///
/// ```rust
/// pub struct SyncResp {
/// pub server_changes: Vec<serde_json::Value>,
/// pub conflicts: Vec<ConflictInfo>,
/// pub sync_time: DateTime<Utc>,
/// }
/// ```
class SyncResp {
final List<Map<String, dynamic>> serverChanges;
final List<ConflictInfo> conflicts;
final DateTime syncTime;
const SyncResp({
required this.serverChanges,
required this.conflicts,
required this.syncTime,
});
factory SyncResp.fromJson(Map<String, dynamic> json) => SyncResp(
serverChanges: (json['server_changes'] as List)
.map((e) => Map<String, dynamic>.from(e as Map))
.toList(),
conflicts: (json['conflicts'] as List)
.map((e) => ConflictInfo.fromJson(Map<String, dynamic>.from(e as Map)))
.toList(),
syncTime: DateTime.parse(json['sync_time'] as String),
);
}
/// 冲突信息 — 与 Rust ConflictInfo 对应
///
/// ```rust
/// pub struct ConflictInfo {
/// pub journal_id: Uuid,
/// pub local_version: i32,
/// pub server_version: i32,
/// }
/// ```
class ConflictInfo {
final String journalId;
final int localVersion;
final int serverVersion;
const ConflictInfo({
required this.journalId,
required this.localVersion,
required this.serverVersion,
});
factory ConflictInfo.fromJson(Map<String, dynamic> json) => ConflictInfo(
journalId: json['journal_id'] as String,
localVersion: json['local_version'] as int,
serverVersion: json['server_version'] as int,
);
}

View File

@@ -10,6 +10,8 @@
import 'package:dio/dio.dart';
import 'package:connectivity_plus/connectivity_plus.dart';
import '../models/sync_models.dart';
/// 网络离线异常 — 网络不可用时由 ApiClient 抛出
class OfflineException implements Exception {
final String message;
@@ -187,4 +189,19 @@ class ApiClient {
});
return _dio.post<T>(path, data: formData);
}
// ===== 同步 API =====
/// 批量同步 — POST /diary/sync
///
/// 将客户端变更批量提交到服务端,返回服务端变更和冲突信息。
/// 对应 Rust sync_handler::sync_journals 端点。
Future<SyncResp> sync(SyncReq req) async {
await _ensureOnline();
final response = await _dio.post<Map<String, dynamic>>(
'/diary/sync',
data: req.toJson(),
);
return SyncResp.fromJson(response.data!);
}
}

View File

@@ -22,6 +22,7 @@ import 'package:isar/isar.dart';
import '../local/isar_database_native.dart';
import '../local/collections/pending_operation_collection.dart';
import '../models/sync_models.dart';
import '../remote/api_client.dart';
/// 同步操作类型
@@ -277,6 +278,79 @@ class SyncEngine {
await persistPendingQueue();
}
/// 批量同步 — 使用 POST /diary/sync 端点一次性提交所有变更
///
/// 将队列中的 PendingOperation 转换为 SyncChange 列表,
/// 调用 Rust sync_handler 批量处理,获取服务端变更和冲突。
/// 成功后清空队列;失败时保留队列供重试。
Future<SyncResp?> tryBatchSync({DateTime? lastSyncTime}) async {
if (_status == SyncStatus.syncing) return null;
if (_pendingQueue.isEmpty) {
_status = SyncStatus.idle;
return null;
}
_status = SyncStatus.syncing;
_lastError = null;
try {
// 转换: PendingOperation → SyncChange
final changes = _pendingQueue.map(_operationToSyncChange).toList();
final req = SyncReq(
lastSyncTime: lastSyncTime,
changes: changes,
);
final resp = await _apiClient.sync(req);
// 处理冲突 — 将冲突的操作保留在队列中
if (resp.conflicts.isNotEmpty) {
final conflictIds = resp.conflicts.map((c) => c.journalId).toSet();
// 移除已成功同步的非冲突操作,保留冲突操作
_pendingQueue.removeWhere(
(op) => !conflictIds.contains(op.id),
);
_lastError = '${resp.conflicts.length} 个操作存在版本冲突';
} else {
// 全部成功,清空队列
_pendingQueue.clear();
}
_status = _pendingQueue.isEmpty ? SyncStatus.idle : SyncStatus.paused;
await persistPendingQueue();
return resp;
} on OfflineException {
_status = SyncStatus.paused;
_lastError = '同步中断:网络不可用';
return null;
} catch (e) {
_status = SyncStatus.error;
_lastError = '批量同步失败: $e';
return null;
}
}
/// PendingOperation → SyncChange 转换
SyncChange _operationToSyncChange(PendingOperation op) {
switch (op.type) {
case SyncOperationType.create:
return SyncChangeCreateJournal(data: op.data);
case SyncOperationType.update:
return SyncChangeUpdateJournal(
id: op.id,
version: op.version,
data: op.data,
);
case SyncOperationType.delete:
return SyncChangeDeleteJournal(
id: op.id,
version: op.version,
);
}
}
/// 执行单个同步操作
Future<void> _executeOperation(PendingOperation operation) async {
switch (operation.type) {