Files
nj/app/lib/data/services/sync_engine_native.dart
iven bb388ed8ff
Some checks failed
Main Merge / backend (push) Has been cancelled
Main Merge / frontend (push) Has been cancelled
fix(app): 日记可见性修复 — 私密日记仅本地 + Web 端 ID 修复 + 分享按钮
问题修复:
1. Web端保存的日记看不到:createJournal 返回值未捕获,server ID 丢失导致
   后续元素保存用错 ID。现在使用 saved.id 贯穿全部操作。
2. 管理端看不到新建日记:后端 list_journals 添加 is_private 过滤,admin/teacher
   查看他人日记时排除私密日记。
3. RemoteJournalRepository 添加 onJournalChanged 变更通知流,HomeBloc 可自动刷新。
4. SyncEngine(native + web)enqueue 添加 is_private 防御性检查,私密日记不入队。
5. 编辑器 _persistState 条件入队:仅非私密日记同步到后端。
6. 分享流程改造:首次从私密变为公开时入队 create 操作上传。
7. 日记卡片添加可见性标签(仅自己可见/班级可见/公开),私密日记可点击分享。
8. 首页 _sharePrivateJournal 弹出 ShareBottomSheet 主动分享。
2026-06-04 12:03:24 +08:00

505 lines
15 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 同步引擎 — WiFi 增量同步 + 操作队列 + Isar 持久化
//
// 设计思路:
// - 所有本地修改先入队 [PendingOperation]
// - 网络恢复时自动批量同步
// - 版本号冲突检测Phase 1 使用"本地优先"策略
// - 最大重试次数限制,超过后标记为冲突供用户手动解决
// - 队列持久化到 Isar应用退出后不丢失
//
// Phase 1 策略:本地优先
// - 离线时正常使用,操作入队等待
// - 联网后自动推送待同步操作
// - 版本冲突时本地版本覆盖远端(简单策略)
import 'dart:async';
import 'dart:convert';
import 'dart:collection';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:flutter/foundation.dart';
import 'package:isar/isar.dart';
import '../local/isar_database_native.dart';
import '../local/collections/pending_operation_collection.dart';
import '../models/sync_models.dart';
import '../remote/api_client.dart';
/// 同步操作类型
enum SyncOperationType {
create('POST'),
update('PUT'),
delete('DELETE');
const SyncOperationType(this.httpMethod);
final String httpMethod;
}
/// 同步状态
enum SyncStatus {
idle, // 空闲,无待同步操作
syncing, // 正在同步
paused, // 暂停(网络不可用)
error, // 出错,需要重试
}
/// 待同步操作 — 记录一次本地修改
class PendingOperation {
final String id;
final SyncOperationType type;
final String endpoint;
final Map<String, dynamic> data;
final int version;
final DateTime createdAt;
final int retryCount;
/// 最大重试次数
static const int maxRetryCount = 5;
const PendingOperation({
required this.id,
required this.type,
required this.endpoint,
required this.data,
required this.version,
required this.createdAt,
this.retryCount = 0,
});
PendingOperation copyWith({
String? id,
SyncOperationType? type,
String? endpoint,
Map<String, dynamic>? data,
int? version,
DateTime? createdAt,
int? retryCount,
}) =>
PendingOperation(
id: id ?? this.id,
type: type ?? this.type,
endpoint: endpoint ?? this.endpoint,
data: data ?? this.data,
version: version ?? this.version,
createdAt: createdAt ?? this.createdAt,
retryCount: retryCount ?? this.retryCount,
);
/// 是否已超过最大重试次数
bool get isExhausted => retryCount >= maxRetryCount;
}
/// 同步引擎 — 管理 WiFi 增量同步和操作队列
///
/// 使用方式:
/// ```dart
/// final engine = SyncEngine(apiClient: apiClient);
///
/// // 启动时恢复持久化队列
/// await engine.restorePendingQueue();
///
/// // 本地修改后入队
/// engine.enqueue(PendingOperation(
/// id: 'op-1',
/// type: SyncOperationType.create,
/// endpoint: '/diary/entries',
/// data: entry.toJson(),
/// version: 1,
/// createdAt: DateTime.now(),
/// ));
///
/// // 网络恢复时触发同步
/// await engine.trySync();
///
/// // 应用退出时持久化
/// await engine.persistPendingQueue();
/// ```
class SyncEngine {
final ApiClient _apiClient;
final Queue<PendingOperation> _pendingQueue = Queue();
StreamSubscription<List<ConnectivityResult>>? _connectivitySub;
SyncStatus _status = SyncStatus.idle;
String? _lastError;
SyncEngine({required ApiClient apiClient}) : _apiClient = apiClient;
/// 当前同步状态
SyncStatus get status => _status;
/// 最近一次错误信息
String? get lastError => _lastError;
/// 待同步操作数量
int get pendingCount => _pendingQueue.length;
/// 是否有操作正在同步
bool get isSyncing => _status == SyncStatus.syncing;
/// 添加待同步操作到队列尾部
///
/// 合并策略8b-N01同一资源endpoint 相同)的连续操作只保留最新一条。
/// create+update → create使用最新数据
/// update+update → update使用最新数据
/// update+delete → delete资源最终被删除
/// create+delete → 取消(资源从未存在)
///
/// 私密日记is_private=true不入队 — 仅保存在本地,不上传后端。
void enqueue(PendingOperation operation) {
// 防御性检查:私密日记不入队
final isPrivate = operation.data['is_private'] as bool? ?? false;
if (isPrivate) {
debugPrint('SyncEngine.enqueue: 跳过私密日记 ${operation.id}');
return;
}
// 查找队列中同一资源的最后一个操作
PendingOperation? existing;
for (final op in _pendingQueue) {
if (op.endpoint == operation.endpoint) {
existing = op;
}
}
if (existing != null) {
final merged = _mergeOperations(existing, operation);
_pendingQueue.remove(existing);
if (merged != null) {
_pendingQueue.add(merged);
}
// merged == null → create+delete 取消,不添加
} else {
_pendingQueue.add(operation);
}
if (_status == SyncStatus.idle) {
_status = SyncStatus.paused;
}
}
/// 批量添加待同步操作(每个操作独立走合并逻辑)
void enqueueAll(List<PendingOperation> operations) {
for (final op in operations) {
enqueue(op);
}
}
/// 合并同一资源的两个操作
///
/// 返回合并后的操作,或 null 表示应取消create+delete
PendingOperation? _mergeOperations(
PendingOperation existing,
PendingOperation incoming,
) {
// create + delete → 取消(资源从未同步到服务端)
if (existing.type == SyncOperationType.create &&
incoming.type == SyncOperationType.delete) {
return null;
}
// create + update → create使用最新数据
if (existing.type == SyncOperationType.create &&
incoming.type == SyncOperationType.update) {
return existing.copyWith(data: incoming.data, version: incoming.version);
}
// update + update → update使用最新数据
if (existing.type == SyncOperationType.update &&
incoming.type == SyncOperationType.update) {
return incoming;
}
// update + delete → delete
if (existing.type == SyncOperationType.update &&
incoming.type == SyncOperationType.delete) {
return incoming;
}
// 其他组合delete+create, create+create 等)不合并
return incoming;
}
/// 检查网络状态并尝试同步全部待处理操作
///
/// 同步策略:
/// 1. 检查网络是否可用
/// 2. 按先进先出顺序处理队列
/// 3. 每个操作最多重试 [PendingOperation.maxRetryCount] 次
/// 4. 超过重试次数的操作标记为冲突,移出队列
/// 5. 网络中断时暂停同步,保留剩余操作
Future<void> trySync() async {
if (_status == SyncStatus.syncing) return; // 防止重入
if (_pendingQueue.isEmpty) {
_status = SyncStatus.idle;
return;
}
// 检查网络
final connectivity = Connectivity();
final result = await connectivity.checkConnectivity();
final isOnline = result.any((r) => r != ConnectivityResult.none);
if (!isOnline) {
_status = SyncStatus.paused;
_lastError = '网络不可用';
return;
}
// WiFi 优先策略:仅在 WiFi 下自动同步Phase 1 简化)
// TODO: 添加用户设置允许蜂窝数据同步
_status = SyncStatus.syncing;
_lastError = null;
while (_pendingQueue.isNotEmpty) {
final operation = _pendingQueue.removeFirst();
try {
await _executeOperation(operation);
} on OfflineException {
// 网络中断,操作放回队列头部
_pendingQueue.addFirst(operation);
_status = SyncStatus.paused;
_lastError = '同步中断:网络不可用';
return;
} catch (e) {
debugPrint('SyncEngine.trySync 操作失败: $e');
// 操作失败,增加重试计数
final retried = operation.copyWith(retryCount: operation.retryCount + 1);
if (retried.isExhausted) {
// 超过最大重试次数标记为冲突Phase 1 简化:丢弃)
// TODO: Phase 2 将冲突操作持久化,提供 UI 让用户手动解决
_lastError = '操作同步失败(已耗尽重试次数): ${operation.endpoint}';
continue;
}
// 放回队列头部,下次重试
_pendingQueue.addFirst(retried);
_status = SyncStatus.error;
_lastError = '同步失败: $e';
return;
}
}
// 全部同步完成,更新持久化
_status = SyncStatus.idle;
_lastError = null;
await persistPendingQueue();
}
/// 批量同步 — 使用 POST /diary/sync 端点一次性提交所有变更
///
/// 将队列中的 PendingOperation 转换为 SyncChange 列表,
/// 调用 Rust sync_handler 批量处理,获取服务端变更和冲突。
/// 成功后清空队列;失败时保留队列供重试。
Future<SyncResp?> tryBatchSync({DateTime? lastSyncTime}) async {
if (_status == SyncStatus.syncing) return null;
if (_pendingQueue.isEmpty) {
_status = SyncStatus.idle;
return null;
}
_status = SyncStatus.syncing;
_lastError = null;
try {
// 转换: PendingOperation → SyncChange
final changes = _pendingQueue.map(_operationToSyncChange).toList();
final req = SyncReq(
lastSyncTime: lastSyncTime,
changes: changes,
);
final resp = await _apiClient.sync(req);
// 处理冲突 — 将冲突的操作保留在队列中
if (resp.conflicts.isNotEmpty) {
final conflictIds = resp.conflicts.map((c) => c.journalId).toSet();
// 移除已成功同步的非冲突操作,保留冲突操作
_pendingQueue.removeWhere(
(op) => !conflictIds.contains(op.id),
);
_lastError = '${resp.conflicts.length} 个操作存在版本冲突';
} else {
// 全部成功,清空队列
_pendingQueue.clear();
}
_status = _pendingQueue.isEmpty ? SyncStatus.idle : SyncStatus.paused;
await persistPendingQueue();
return resp;
} on OfflineException {
_status = SyncStatus.paused;
_lastError = '同步中断:网络不可用';
return null;
} catch (e) {
_status = SyncStatus.error;
_lastError = '批量同步失败: $e';
return null;
}
}
/// PendingOperation → SyncChange 转换
SyncChange _operationToSyncChange(PendingOperation op) {
switch (op.type) {
case SyncOperationType.create:
return SyncChangeCreateJournal(data: op.data);
case SyncOperationType.update:
return SyncChangeUpdateJournal(
id: op.id,
version: op.version,
data: op.data,
);
case SyncOperationType.delete:
return SyncChangeDeleteJournal(
id: op.id,
version: op.version,
);
}
}
/// 执行单个同步操作
Future<void> _executeOperation(PendingOperation operation) async {
switch (operation.type) {
case SyncOperationType.create:
await _apiClient.post(operation.endpoint, data: operation.data);
case SyncOperationType.update:
await _apiClient.put(operation.endpoint, data: operation.data);
case SyncOperationType.delete:
await _apiClient.delete(operation.endpoint);
}
}
/// 清空队列(数据已全部同步完成或需要强制清空时调用)
void clear() {
_pendingQueue.clear();
_status = SyncStatus.idle;
_lastError = null;
}
/// 获取当前队列中所有操作的快照(用于持久化到本地存储)
///
/// 应用退出时调用此方法,将待同步操作保存到 Isar
/// 下次启动时通过 [restorePendingQueue] 恢复。
List<PendingOperation> get snapshot => _pendingQueue.toList();
// ============================================================
// Isar 持久化
// ============================================================
/// 将当前内存队列持久化到 Isar
///
/// 替换策略:先清空旧的持久化数据,再写入当前队列。
/// 在 app 退出、isolate 暂停、或同步完成后调用。
Future<void> persistPendingQueue() async {
if (!IsarDatabase.isAvailable) return;
final isar = IsarDatabase.instance;
final ops = snapshot;
await isar.writeTxn(() async {
// 清空旧数据
await isar.pendingOperationCollections.clear();
// 写入当前队列
for (final op in ops) {
final col = _operationToCollection(op);
await isar.pendingOperationCollections.put(col);
}
});
}
/// 从 Isar 恢复持久化队列到内存
///
/// 在 app 启动时调用,恢复上次退出时未同步的操作。
/// Web 平台上 Isar 不可用,跳过恢复。
Future<void> restorePendingQueue() async {
if (!IsarDatabase.isAvailable) return;
final isar = IsarDatabase.instance;
final persisted = await isar.pendingOperationCollections
.where()
.anyIsarId()
.findAll();
for (final col in persisted) {
final op = _collectionToOperation(col);
_pendingQueue.add(op);
}
if (_pendingQueue.isNotEmpty && _status == SyncStatus.idle) {
_status = SyncStatus.paused;
}
}
/// 启动网络监听 — 网络恢复时自动触发同步
///
/// 在 app.dart 中创建 SyncEngine 后调用一次。
/// 调用 [dispose] 停止监听。
void startAutoSync() {
_connectivitySub = Connectivity().onConnectivityChanged.listen((result) {
final isOnline = result.any((r) => r != ConnectivityResult.none);
if (isOnline && _pendingQueue.isNotEmpty && _status != SyncStatus.syncing) {
debugPrint('SyncEngine: 网络恢复,开始同步 ${_pendingQueue.length} 个操作');
trySync();
}
});
}
/// 停止网络监听并清理资源
void dispose() {
_connectivitySub?.cancel();
_connectivitySub = null;
}
// ============================================================
// 转换函数
// ============================================================
/// PendingOperation → PendingOperationCollection
PendingOperationCollection _operationToCollection(PendingOperation op) {
return PendingOperationCollection()
..id = op.id
..operationType = op.type.httpMethod
..endpoint = op.endpoint
..dataJson = _encodeJson(op.data)
..version = op.version
..createdAtEpoch = op.createdAt.millisecondsSinceEpoch
..retryCount = op.retryCount;
}
/// PendingOperationCollection → PendingOperation
PendingOperation _collectionToOperation(PendingOperationCollection col) {
return PendingOperation(
id: col.id,
type: SyncOperationType.values.firstWhere(
(t) => t.httpMethod == col.operationType,
orElse: () => SyncOperationType.create,
),
endpoint: col.endpoint,
data: _decodeJson(col.dataJson),
version: col.version,
createdAt: DateTime.fromMillisecondsSinceEpoch(col.createdAtEpoch),
retryCount: col.retryCount,
);
}
/// 安全编码 JSON
String _encodeJson(Map<String, dynamic> data) {
try {
return jsonEncode(data);
} catch (_) {
return '{}';
}
}
/// 安全解码 JSON
Map<String, dynamic> _decodeJson(String json) {
try {
return jsonDecode(json) as Map<String, dynamic>;
} catch (_) {
return {};
}
}
}