// 同步引擎 — WiFi 增量同步 + 操作队列 + Isar 持久化 // // 设计思路: // - 所有本地修改先入队 [PendingOperation] // - 网络恢复时自动批量同步 // - 版本号冲突检测,Phase 1 使用"本地优先"策略 // - 最大重试次数限制,超过后标记为冲突供用户手动解决 // - 队列持久化到 Isar,应用退出后不丢失 // // Phase 1 策略:本地优先 // - 离线时正常使用,操作入队等待 // - 联网后自动推送待同步操作 // - 版本冲突时本地版本覆盖远端(简单策略) import 'dart:async'; import 'dart:convert'; import 'dart:collection'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:flutter/foundation.dart'; import 'package:isar/isar.dart'; import '../local/isar_database.dart'; import '../local/collections/pending_operation_collection.dart'; import '../remote/api_client.dart'; /// 同步操作类型 enum SyncOperationType { create('POST'), update('PUT'), delete('DELETE'); const SyncOperationType(this.httpMethod); final String httpMethod; } /// 同步状态 enum SyncStatus { idle, // 空闲,无待同步操作 syncing, // 正在同步 paused, // 暂停(网络不可用) error, // 出错,需要重试 } /// 待同步操作 — 记录一次本地修改 class PendingOperation { final String id; final SyncOperationType type; final String endpoint; final Map data; final int version; final DateTime createdAt; final int retryCount; /// 最大重试次数 static const int maxRetryCount = 5; const PendingOperation({ required this.id, required this.type, required this.endpoint, required this.data, required this.version, required this.createdAt, this.retryCount = 0, }); PendingOperation copyWith({ String? id, SyncOperationType? type, String? endpoint, Map? data, int? version, DateTime? createdAt, int? retryCount, }) => PendingOperation( id: id ?? this.id, type: type ?? this.type, endpoint: endpoint ?? this.endpoint, data: data ?? this.data, version: version ?? this.version, createdAt: createdAt ?? this.createdAt, retryCount: retryCount ?? this.retryCount, ); /// 是否已超过最大重试次数 bool get isExhausted => retryCount >= maxRetryCount; } /// 同步引擎 — 管理 WiFi 增量同步和操作队列 /// /// 使用方式: /// ```dart /// final engine = SyncEngine(apiClient: apiClient); /// /// // 启动时恢复持久化队列 /// await engine.restorePendingQueue(); /// /// // 本地修改后入队 /// engine.enqueue(PendingOperation( /// id: 'op-1', /// type: SyncOperationType.create, /// endpoint: '/diary/entries', /// data: entry.toJson(), /// version: 1, /// createdAt: DateTime.now(), /// )); /// /// // 网络恢复时触发同步 /// await engine.trySync(); /// /// // 应用退出时持久化 /// await engine.persistPendingQueue(); /// ``` class SyncEngine { final ApiClient _apiClient; final Queue _pendingQueue = Queue(); StreamSubscription>? _connectivitySub; SyncStatus _status = SyncStatus.idle; String? _lastError; SyncEngine({required ApiClient apiClient}) : _apiClient = apiClient; /// 当前同步状态 SyncStatus get status => _status; /// 最近一次错误信息 String? get lastError => _lastError; /// 待同步操作数量 int get pendingCount => _pendingQueue.length; /// 是否有操作正在同步 bool get isSyncing => _status == SyncStatus.syncing; /// 添加待同步操作到队列尾部 void enqueue(PendingOperation operation) { _pendingQueue.add(operation); if (_status == SyncStatus.idle) { _status = SyncStatus.paused; } } /// 批量添加待同步操作 void enqueueAll(List operations) { for (final op in operations) { _pendingQueue.add(op); } if (_status == SyncStatus.idle && _pendingQueue.isNotEmpty) { _status = SyncStatus.paused; } } /// 检查网络状态并尝试同步全部待处理操作 /// /// 同步策略: /// 1. 检查网络是否可用 /// 2. 按先进先出顺序处理队列 /// 3. 每个操作最多重试 [PendingOperation.maxRetryCount] 次 /// 4. 超过重试次数的操作标记为冲突,移出队列 /// 5. 网络中断时暂停同步,保留剩余操作 Future trySync() async { if (_status == SyncStatus.syncing) return; // 防止重入 if (_pendingQueue.isEmpty) { _status = SyncStatus.idle; return; } // 检查网络 final connectivity = Connectivity(); final result = await connectivity.checkConnectivity(); final isOnline = result.any((r) => r != ConnectivityResult.none); if (!isOnline) { _status = SyncStatus.paused; _lastError = '网络不可用'; return; } // WiFi 优先策略:仅在 WiFi 下自动同步(Phase 1 简化) // TODO: 添加用户设置允许蜂窝数据同步 _status = SyncStatus.syncing; _lastError = null; while (_pendingQueue.isNotEmpty) { final operation = _pendingQueue.removeFirst(); try { await _executeOperation(operation); } on OfflineException { // 网络中断,操作放回队列头部 _pendingQueue.addFirst(operation); _status = SyncStatus.paused; _lastError = '同步中断:网络不可用'; return; } catch (e) { debugPrint('SyncEngine.trySync 操作失败: $e'); // 操作失败,增加重试计数 final retried = operation.copyWith(retryCount: operation.retryCount + 1); if (retried.isExhausted) { // 超过最大重试次数,标记为冲突(Phase 1 简化:丢弃) // TODO: Phase 2 将冲突操作持久化,提供 UI 让用户手动解决 _lastError = '操作同步失败(已耗尽重试次数): ${operation.endpoint}'; continue; } // 放回队列头部,下次重试 _pendingQueue.addFirst(retried); _status = SyncStatus.error; _lastError = '同步失败: $e'; return; } } // 全部同步完成,更新持久化 _status = SyncStatus.idle; _lastError = null; await persistPendingQueue(); } /// 执行单个同步操作 Future _executeOperation(PendingOperation operation) async { switch (operation.type) { case SyncOperationType.create: await _apiClient.post(operation.endpoint, data: operation.data); case SyncOperationType.update: await _apiClient.put(operation.endpoint, data: operation.data); case SyncOperationType.delete: await _apiClient.delete(operation.endpoint); } } /// 清空队列(数据已全部同步完成或需要强制清空时调用) void clear() { _pendingQueue.clear(); _status = SyncStatus.idle; _lastError = null; } /// 获取当前队列中所有操作的快照(用于持久化到本地存储) /// /// 应用退出时调用此方法,将待同步操作保存到 Isar, /// 下次启动时通过 [restorePendingQueue] 恢复。 List get snapshot => _pendingQueue.toList(); // ============================================================ // Isar 持久化 // ============================================================ /// 将当前内存队列持久化到 Isar /// /// 替换策略:先清空旧的持久化数据,再写入当前队列。 /// 在 app 退出、isolate 暂停、或同步完成后调用。 Future persistPendingQueue() async { if (!IsarDatabase.isAvailable) return; final isar = IsarDatabase.instance!; final ops = snapshot; await isar.writeTxn(() async { // 清空旧数据 await isar.pendingOperationCollections.clear(); // 写入当前队列 for (final op in ops) { final col = _operationToCollection(op); await isar.pendingOperationCollections.put(col); } }); } /// 从 Isar 恢复持久化队列到内存 /// /// 在 app 启动时调用,恢复上次退出时未同步的操作。 /// Web 平台上 Isar 不可用,跳过恢复。 Future restorePendingQueue() async { if (!IsarDatabase.isAvailable) return; final isar = IsarDatabase.instance!; final persisted = await isar.pendingOperationCollections .where() .anyIsarId() .findAll(); for (final col in persisted) { final op = _collectionToOperation(col); _pendingQueue.add(op); } if (_pendingQueue.isNotEmpty && _status == SyncStatus.idle) { _status = SyncStatus.paused; } } /// 启动网络监听 — 网络恢复时自动触发同步 /// /// 在 app.dart 中创建 SyncEngine 后调用一次。 /// 调用 [dispose] 停止监听。 void startAutoSync() { _connectivitySub = Connectivity().onConnectivityChanged.listen((result) { final isOnline = result.any((r) => r != ConnectivityResult.none); if (isOnline && _pendingQueue.isNotEmpty && _status != SyncStatus.syncing) { debugPrint('SyncEngine: 网络恢复,开始同步 ${_pendingQueue.length} 个操作'); trySync(); } }); } /// 停止网络监听并清理资源 void dispose() { _connectivitySub?.cancel(); _connectivitySub = null; } // ============================================================ // 转换函数 // ============================================================ /// PendingOperation → PendingOperationCollection PendingOperationCollection _operationToCollection(PendingOperation op) { return PendingOperationCollection() ..id = op.id ..operationType = op.type.httpMethod ..endpoint = op.endpoint ..dataJson = _encodeJson(op.data) ..version = op.version ..createdAtEpoch = op.createdAt.millisecondsSinceEpoch ..retryCount = op.retryCount; } /// PendingOperationCollection → PendingOperation PendingOperation _collectionToOperation(PendingOperationCollection col) { return PendingOperation( id: col.id, type: SyncOperationType.values.firstWhere( (t) => t.httpMethod == col.operationType, orElse: () => SyncOperationType.create, ), endpoint: col.endpoint, data: _decodeJson(col.dataJson), version: col.version, createdAt: DateTime.fromMillisecondsSinceEpoch(col.createdAtEpoch), retryCount: col.retryCount, ); } /// 安全编码 JSON String _encodeJson(Map data) { try { return jsonEncode(data); } catch (_) { return '{}'; } } /// 安全解码 JSON Map _decodeJson(String json) { try { return jsonDecode(json) as Map; } catch (_) { return {}; } } }