Skip to content

实时通信模块

当你需要“后端变更后立刻通知前端”,比如在线人数、字典更新、系统广播消息等场景,可以使用本模块。

youlai-boot 基于 SSE (Server-Sent Events) 实现实时通信功能,支持服务端主动推送消息到前端。本文涵盖:

  • SSE 的整体架构与数据流
  • 事件主题(Topic)与前端订阅方式
  • 断线重连与鉴权建议

设计目标

目标说明
实时推送服务端数据变更后立即通知前端,无需轮询
按需刷新通知前端清除缓存,下次使用时再加载,避免缓存穿透
多设备支持同一用户支持多设备同时在线
安全认证基于 JWT 的 SSE 连接认证
断线重连前端自动重连,支持指数退避

应用场景

场景说明事件名称
用户在线状态实时统计在线用户数,支持多设备登录online-count
字典变更通知字典数据变更后通知前端清除缓存dict
系统消息系统广播消息system

整体架构

核心组件

组件说明
SseControllerSSE 连接端点,处理客户端连接请求
SseService核心服务,用户在线管理与消息推送
SseSessionRegistrySSE 会话注册表,维护在线状态
SseTopics事件主题常量定义
DictChangeEvent字典变更事件 DTO
OnlineUserDTO在线用户信息 DTO

连接与认证流程

用户在线状态管理

数据结构

采用三 Map 结构实现高效查询:

映射表KeyValue用途
userEmittersMap用户名Emitter集合支持多设备登录,统计用户会话数
emitterUserMapEmitter用户名快速定位用户
emitterTimeMapEmitter连接时间获取连接时间

在线用户统计

java
// 在线用户数(非连接数)
public int getOnlineUserCount() {
    return userEmittersMap.size();
}

// 在线连接总数(包含多设备)
public int getTotalConnectionCount() {
    return emitterUserMap.size();
}

多设备登录处理

字典变更通知

设计思路

采用"通知清除,按需加载"策略,避免推送完整字典数据:

优势分析

策略实时性带宽消耗缓存一致性适用场景
推送完整数据数据量小
推送变更数据需增量更新
通知清除(本项目)通用场景
过期刷新数据实时性要求低

消息结构

后端 DTO

java
@Data
public class DictChangeEvent implements Serializable {
    /** 字典编码 */
    private String dictCode;
    /** 事件时间戳 */
    private long timestamp;
}

前端接口

typescript
interface DictChangeMessage {
  /** 字典编码 */
  dictCode: string;
  /** 时间戳 */
  timestamp: number;
}

字典变更流程

SSE 事件定义

事件名称类型说明消息体
dict广播字典变更通知DictChangeEvent
online-count广播在线用户数变更Integer
system广播系统消息Object

前端集成

连接管理

typescript
// useSse.ts - SSE 连接管理
const sse = useSse({
  url: "/api/v1/sse/connect", // SSE 端点
  connectionTimeout: 10000, // 连接超时
  reconnectInterval: 5000, // 重连间隔基数
  maxReconnectInterval: 120000, // 重连间隔上限
  maxReconnectAttempts: 10, // 最大重连次数
});

// 建立连接
sse.connect();

// 订阅事件
const unsubscribe = sse.on("dict", (data) => {
  console.log("字典变更:", data.dictCode);
});

// 断开连接
sse.disconnect();

字典同步

typescript
// useDictSync.ts - 字典同步
const dictSync = useDictSync();

// 初始化(应用启动时)
dictSync.initialize();

// 注册回调
dictSync.onDictChange((message) => {
  console.log("字典已更新:", message.dictCode);
});

在线人数

typescript
// useOnlineCount.ts - 在线人数
const onlineCount = useOnlineCount();

// 初始化
onlineCount.initialize();

// 获取在线人数
console.log("在线人数:", onlineCount.onlineUserCount.value);

核心特性

特性说明
自动重连断线后自动重连,支持指数退避
事件订阅支持订阅多个事件类型
连接状态管理提供连接状态枚举
单例模式全局单例,避免重复连接

配置项

SSE 连接超时

后端 SSE 连接默认超时 30 分钟:

java
// SseService.java
private static final long TIMEOUT = 30 * 60 * 1000L; // 30 分钟

安全配置

SSE 端点需要认证,前端在请求头携带 JWT Token:

http
GET /api/v1/sse/connect HTTP/1.1
Authorization: Bearer {token}
Accept: text/event-stream

定时任务

系统通过定时任务定期广播在线用户数,确保数据一致性:

java
@Scheduled(fixedRate = 60000) // 每分钟
public void broadcastOnlineCount() {
    sseService.sendOnlineCount();
}

最佳实践

后端

  1. 参数校验:所有公开方法进行参数校验
  2. 线程安全:使用 ConcurrentHashMap 保证线程安全
  3. 日志记录:连接/断开/消息推送记录日志
  4. 资源清理:用户断开时通过 onCompletion/onTimeout/onError 回调清理
  5. 超时设置:SSE 连接设置合理超时时间(默认 30 分钟)

前端

  1. 单例模式:SSE 连接使用单例,避免重复连接
  2. 按需订阅:只订阅必要的事件类型
  3. 错误处理:消息解析失败时记录日志,不影响主流程
  4. 缓存策略:收到通知后清除缓存,下次使用时再加载
  5. 指数退避:重连时使用指数退避策略,避免频繁重连

相关文件

文件路径说明
message/controller/SseController.javaSSE 控制器
message/service/SseService.javaSSE 服务
message/registry/SseSessionRegistry.java会话注册表
message/dto/DictChangeEvent.java字典变更事件
message/topic/SseTopics.java事件常量定义
message/job/OnlineUserCountJob.java在线人数定时任务
vue3-element-admin/src/composables/sse/useSse.tsSSE 连接管理
vue3-element-admin/src/composables/sse/useDictSync.ts字典同步
vue3-element-admin/src/composables/sse/useOnlineCount.ts在线人数

下一步

基于 MIT 许可发布 · 由 ❤️ 和 ☕ 驱动 · 支持作者