实时通信模块
当你需要“后端变更后立刻通知前端”,比如在线人数、字典更新、系统广播消息等场景,可以使用本模块。
youlai-boot 基于 SSE (Server-Sent Events) 实现实时通信功能,支持服务端主动推送消息到前端。本文涵盖:
- SSE 的整体架构与数据流
- 事件主题(Topic)与前端订阅方式
- 断线重连与鉴权建议
设计目标
| 目标 | 说明 |
|---|---|
| 实时推送 | 服务端数据变更后立即通知前端,无需轮询 |
| 按需刷新 | 通知前端清除缓存,下次使用时再加载,避免缓存穿透 |
| 多设备支持 | 同一用户支持多设备同时在线 |
| 安全认证 | 基于 JWT 的 SSE 连接认证 |
| 断线重连 | 前端自动重连,支持指数退避 |
应用场景
| 场景 | 说明 | 事件名称 |
|---|---|---|
| 用户在线状态 | 实时统计在线用户数,支持多设备登录 | online-count |
| 字典变更通知 | 字典数据变更后通知前端清除缓存 | dict |
| 系统消息 | 系统广播消息 | system |
整体架构
核心组件
| 组件 | 说明 |
|---|---|
SseController | SSE 连接端点,处理客户端连接请求 |
SseService | 核心服务,用户在线管理与消息推送 |
SseSessionRegistry | SSE 会话注册表,维护在线状态 |
SseTopics | 事件主题常量定义 |
DictChangeEvent | 字典变更事件 DTO |
OnlineUserDTO | 在线用户信息 DTO |
连接与认证流程
用户在线状态管理
数据结构
采用三 Map 结构实现高效查询:
| 映射表 | Key | Value | 用途 |
|---|---|---|---|
userEmittersMap | 用户名 | Emitter集合 | 支持多设备登录,统计用户会话数 |
emitterUserMap | Emitter | 用户名 | 快速定位用户 |
emitterTimeMap | Emitter | 连接时间 | 获取连接时间 |
在线用户统计
java
// 在线用户数(非连接数)
public int getOnlineUserCount() {
return userEmittersMap.size();
}
// 在线连接总数(包含多设备)
public int getTotalConnectionCount() {
return emitterUserMap.size();
}多设备登录处理
字典变更通知
设计思路
采用"通知清除,按需加载"策略,避免推送完整字典数据:
优势分析
| 策略 | 实时性 | 带宽消耗 | 缓存一致性 | 适用场景 |
|---|---|---|---|---|
| 推送完整数据 | 高 | 高 | 强 | 数据量小 |
| 推送变更数据 | 高 | 中 | 中 | 需增量更新 |
| 通知清除(本项目) | 高 | 低 | 强 | 通用场景 |
| 过期刷新 | 低 | 低 | 弱 | 数据实时性要求低 |
消息结构
后端 DTO
java
@Data
public class DictChangeEvent implements Serializable {
/** 字典编码 */
private String dictCode;
/** 事件时间戳 */
private long timestamp;
}前端接口
typescript
interface DictChangeMessage {
/** 字典编码 */
dictCode: string;
/** 时间戳 */
timestamp: number;
}字典变更流程
SSE 事件定义
| 事件名称 | 类型 | 说明 | 消息体 |
|---|---|---|---|
dict | 广播 | 字典变更通知 | DictChangeEvent |
online-count | 广播 | 在线用户数变更 | Integer |
system | 广播 | 系统消息 | Object |
前端集成
连接管理
typescript
// useSse.ts - SSE 连接管理
const sse = useSse({
url: "/api/v1/sse/connect", // SSE 端点
connectionTimeout: 10000, // 连接超时
reconnectInterval: 5000, // 重连间隔基数
maxReconnectInterval: 120000, // 重连间隔上限
maxReconnectAttempts: 10, // 最大重连次数
});
// 建立连接
sse.connect();
// 订阅事件
const unsubscribe = sse.on("dict", (data) => {
console.log("字典变更:", data.dictCode);
});
// 断开连接
sse.disconnect();字典同步
typescript
// useDictSync.ts - 字典同步
const dictSync = useDictSync();
// 初始化(应用启动时)
dictSync.initialize();
// 注册回调
dictSync.onDictChange((message) => {
console.log("字典已更新:", message.dictCode);
});在线人数
typescript
// useOnlineCount.ts - 在线人数
const onlineCount = useOnlineCount();
// 初始化
onlineCount.initialize();
// 获取在线人数
console.log("在线人数:", onlineCount.onlineUserCount.value);核心特性
| 特性 | 说明 |
|---|---|
| 自动重连 | 断线后自动重连,支持指数退避 |
| 事件订阅 | 支持订阅多个事件类型 |
| 连接状态管理 | 提供连接状态枚举 |
| 单例模式 | 全局单例,避免重复连接 |
配置项
SSE 连接超时
后端 SSE 连接默认超时 30 分钟:
java
// SseService.java
private static final long TIMEOUT = 30 * 60 * 1000L; // 30 分钟安全配置
SSE 端点需要认证,前端在请求头携带 JWT Token:
http
GET /api/v1/sse/connect HTTP/1.1
Authorization: Bearer {token}
Accept: text/event-stream定时任务
系统通过定时任务定期广播在线用户数,确保数据一致性:
java
@Scheduled(fixedRate = 60000) // 每分钟
public void broadcastOnlineCount() {
sseService.sendOnlineCount();
}最佳实践
后端
- 参数校验:所有公开方法进行参数校验
- 线程安全:使用
ConcurrentHashMap保证线程安全 - 日志记录:连接/断开/消息推送记录日志
- 资源清理:用户断开时通过 onCompletion/onTimeout/onError 回调清理
- 超时设置:SSE 连接设置合理超时时间(默认 30 分钟)
前端
- 单例模式:SSE 连接使用单例,避免重复连接
- 按需订阅:只订阅必要的事件类型
- 错误处理:消息解析失败时记录日志,不影响主流程
- 缓存策略:收到通知后清除缓存,下次使用时再加载
- 指数退避:重连时使用指数退避策略,避免频繁重连
相关文件
| 文件路径 | 说明 |
|---|---|
message/controller/SseController.java | SSE 控制器 |
message/service/SseService.java | SSE 服务 |
message/registry/SseSessionRegistry.java | 会话注册表 |
message/dto/DictChangeEvent.java | 字典变更事件 |
message/topic/SseTopics.java | 事件常量定义 |
message/job/OnlineUserCountJob.java | 在线人数定时任务 |
vue3-element-admin/src/composables/sse/useSse.ts | SSE 连接管理 |
vue3-element-admin/src/composables/sse/useDictSync.ts | 字典同步 |
vue3-element-admin/src/composables/sse/useOnlineCount.ts | 在线人数 |
