Files
nj/app/lib/data/services/sse_notification_service.dart
iven 49d4aa36a7
Some checks failed
Main Merge / backend (push) Has been cancelled
Main Merge / frontend (push) Has been cancelled
fix(app): Phase 1.1 紧急修复 — SyncEngine 接入 + authorId + catch 异常处理
- feat(sync): SyncEngine 接入 EditorPage, 保存时 enqueue + 网络恢复自动 trySync
- fix(editor): authorId 从 AuthBloc 获取, 替代硬编码 'local'
- fix(bloc): class_bloc/calendar/profile/parent catch(_).全部改为 debugPrint
- feat(editor): 编辑器工具栏拆分 (brush_panel/tag_panel/text_format_bar/dot_grid_painter)
- feat(editor): EditorBloc 扩展 + EditorPage 增强
- feat(search): SearchBloc 扩展搜索功能
- feat(home): HomeBloc/HomePage 增强
- feat(auth): LoginPage 增强
- feat(templates): TemplateGalleryPage 重构
- fix(web): 管理端班级/日记页面修复
- fix(server): comment_service + theme_handler 修复
- docs: 添加全链路审计报告和验证截图
2026-06-02 21:21:43 +08:00

147 lines
3.5 KiB
Dart
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SSE 通知服务 — 监听服务端推送事件
import 'dart:async';
import 'dart:convert';
import 'package:dio/dio.dart';
/// SSE 通知事件
class NotificationEvent {
final String type;
final Map<String, dynamic> payload;
final DateTime receivedAt;
const NotificationEvent({
required this.type,
required this.payload,
required this.receivedAt,
});
}
/// SSE 通知服务 — 监听后端 Server-Sent Events 推送
///
/// 使用方式:
/// ```dart
/// final service = SseNotificationService(token: 'jwt-token');
/// service.events.listen((event) {
/// // 处理通知
/// });
/// await service.connect();
/// ```
class SseNotificationService {
final String _baseUrl;
final String? _token;
Dio? _dio;
Response<ResponseBody>? _response;
StreamController<NotificationEvent>? _controller;
bool _disposed = false;
SseNotificationService({
required String token,
required String baseUrl,
}) : _token = token,
_baseUrl = baseUrl;
/// 通知事件流
Stream<NotificationEvent> get events {
_controller ??= StreamController<NotificationEvent>.broadcast();
return _controller!.stream;
}
/// 连接到 SSE 端点
Future<void> connect() async {
if (_disposed) return;
_dio = Dio(BaseOptions(
baseUrl: _baseUrl,
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
if (_token != null) 'Authorization': 'Bearer $_token',
},
responseType: ResponseType.stream,
));
try {
_response = await _dio!.get<ResponseBody>('/message/stream');
if (_response?.data == null) return;
_response!.data!.stream.listen(
(data) {
_parseSseData(data);
},
onError: (error) {
if (!_disposed) {
// 自动重连逻辑3秒延迟
Future.delayed(const Duration(seconds: 3), () {
if (!_disposed) connect();
});
}
},
onDone: () {
if (!_disposed) {
Future.delayed(const Duration(seconds: 3), () {
if (!_disposed) connect();
});
}
},
);
} catch (e) {
// 连接失败,延迟重连
if (!_disposed) {
Future.delayed(const Duration(seconds: 5), () {
if (!_disposed) connect();
});
}
}
}
/// 解析 SSE 数据帧
void _parseSseData(List<int> data) {
final text = utf8.decode(data);
final lines = text.split('\n');
String? eventType;
String? eventData;
for (final line in lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
eventData = line.substring(5).trim();
} else if (line.isEmpty && eventType != null && eventData != null) {
// 空行 = 事件结束
_emitEvent(eventType, eventData);
eventType = null;
eventData = null;
}
}
}
/// 发射通知事件到流
void _emitEvent(String type, String data) {
if (_disposed || _controller == null) return;
try {
final payload = jsonDecode(data) as Map<String, dynamic>;
_controller!.add(NotificationEvent(
type: type,
payload: payload,
receivedAt: DateTime.now(),
));
} catch (_) {
// 忽略解析错误
}
}
/// 断开连接并释放资源
void dispose() {
_disposed = true;
_response?.data?.stream.listen((_) {});
_controller?.close();
_dio?.close();
}
}