Files
nj/app/lib/data/services/sse_notification_service.dart
iven 263ddf31a6 feat(app): 数据层集成 — RemoteJournalRepository + ClassRepository + SSE 通知
数据层新增:
- RemoteJournalRepository: 日记 CRUD + 元素管理,通过 ApiClient 连接后端
- ClassRepository: 班级/主题/评语 API 操作(getMyClasses/joinClass/assignTopic/createComment)
- SseNotificationService: SSE 实时通知监听 + 自动重连 + 事件流
- ApiException: 统一 API 错误封装
- DTO: ClassMemberDto + TopicDto + CommentDto

设计:
- Repository 模式: 抽象接口 + 远程实现 + 内存实现
- SSE: Dio stream + SSE 协议解析 + 3秒自动重连
- 所有 Repository 通过 ApiClient 注入,依赖现有 JWT 拦截器

验证: flutter analyze 0 error
2026-06-01 10:11:47 +08:00

147 lines
3.5 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SSE 通知服务 — 监听服务端推送事件
import 'dart:async';
import 'dart:convert';
import 'package:dio/dio.dart';
/// SSE 通知事件
class NotificationEvent {
final String type;
final Map<String, dynamic> payload;
final DateTime receivedAt;
const NotificationEvent({
required this.type,
required this.payload,
required this.receivedAt,
});
}
/// SSE 通知服务 — 监听后端 Server-Sent Events 推送
///
/// 使用方式:
/// ```dart
/// final service = SseNotificationService(token: 'jwt-token');
/// service.events.listen((event) {
/// // 处理通知
/// });
/// await service.connect();
/// ```
class SseNotificationService {
final String _baseUrl;
final String? _token;
Dio? _dio;
Response<ResponseBody>? _response;
StreamController<NotificationEvent>? _controller;
bool _disposed = false;
SseNotificationService({
required String token,
String baseUrl = 'http://localhost:8080/api/v1',
}) : _token = token,
_baseUrl = baseUrl;
/// 通知事件流
Stream<NotificationEvent> get events {
_controller ??= StreamController<NotificationEvent>.broadcast();
return _controller!.stream;
}
/// 连接到 SSE 端点
Future<void> connect() async {
if (_disposed) return;
_dio = Dio(BaseOptions(
baseUrl: _baseUrl,
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
if (_token != null) 'Authorization': 'Bearer $_token',
},
responseType: ResponseType.stream,
));
try {
_response = await _dio!.get<ResponseBody>('/message/stream');
if (_response?.data == null) return;
_response!.data!.stream.listen(
(data) {
_parseSseData(data);
},
onError: (error) {
if (!_disposed) {
// 自动重连逻辑3秒延迟
Future.delayed(const Duration(seconds: 3), () {
if (!_disposed) connect();
});
}
},
onDone: () {
if (!_disposed) {
Future.delayed(const Duration(seconds: 3), () {
if (!_disposed) connect();
});
}
},
);
} catch (e) {
// 连接失败,延迟重连
if (!_disposed) {
Future.delayed(const Duration(seconds: 5), () {
if (!_disposed) connect();
});
}
}
}
/// 解析 SSE 数据帧
void _parseSseData(List<int> data) {
final text = utf8.decode(data);
final lines = text.split('\n');
String? eventType;
String? eventData;
for (final line in lines) {
if (line.startsWith('event:')) {
eventType = line.substring(6).trim();
} else if (line.startsWith('data:')) {
eventData = line.substring(5).trim();
} else if (line.isEmpty && eventType != null && eventData != null) {
// 空行 = 事件结束
_emitEvent(eventType, eventData);
eventType = null;
eventData = null;
}
}
}
/// 发射通知事件到流
void _emitEvent(String type, String data) {
if (_disposed || _controller == null) return;
try {
final payload = jsonDecode(data) as Map<String, dynamic>;
_controller!.add(NotificationEvent(
type: type,
payload: payload,
receivedAt: DateTime.now(),
));
} catch (_) {
// 忽略解析错误
}
}
/// 断开连接并释放资源
void dispose() {
_disposed = true;
_response?.data?.stream.listen((_) {});
_controller?.close();
_dio?.close();
}
}