引言
WebSocket
是一种全双工通信协议,允许客户端和服务器之间建立持久连接,实现实时数据传输。项目中集成 WebSocket
可以用于实现实时聊天、通知推送、实时数据更新等场景。
与传统的 HTTP
请求-响应模式相比,WebSocket
具有以下好处和优势:
1. 实时性
WebSocket 支持服务器主动向客户端推送数据,适用于实时聊天、通知推送、实时数据更新等场景。
避免了 HTTP 轮询带来的延迟和资源浪费。
2. 高效性
WebSocket 建立连接后,客户端和服务器可以通过一个持久连接进行双向通信,减少了连接建立和关闭的开销。
数据以帧的形式传输,减少了协议头部的开销。
3. 减少带宽消耗
与 HTTP 轮询相比,WebSocket 只需要建立一次连接,后续通信无需重复发送 HTTP 头部信息,节省了带宽。
4. 支持全双工通信
WebSocket 允许客户端和服务器同时发送和接收数据,适合需要双向交互的场景。
5. 跨平台支持
WebSocket 协议被现代浏览器广泛支持,同时也可以通过客户端库在移动端和桌面端使用。
6. 易于集成
Spring Boot 提供了对 WebSocket 的全面支持,结合 STOMP 协议可以快速实现消息的发布和订阅
添加依赖
在 pom.xml
中添加 WebSocket
依赖:
js
<!-- WebSocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
在 xiaomayi-common/xiaomayi-websocket
模块中已经引入此依赖,在实际使用时直接引入以下依赖即可:
js
<!-- WebSocket依赖模块 -->
<dependency>
<groupId>com.xiaomayi</groupId>
<artifactId>xiaomayi-websocket</artifactId>
</dependency>
配置 WebSocket
创建 WebSocket
通讯配置文件 WebSocketConfig
并开启服务支持。
js
package com.xiaomayi.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* <p>
* WebSocket配置类,开启WebSocket支持
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-05-21
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {
/**
* 注入Bean对象
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*
* @return 返回结果
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
温馨提示
上述配置文件中 @EnableWebSocket
表示开启 WebSocket
支持。
WebSocket端点服务
js
package com.xiaomayi.websocket.server;
import com.alibaba.fastjson2.JSON;
import com.xiaomayi.core.utils.StringUtils;
import com.xiaomayi.websocket.dto.SocketMessage;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* <p>
* WebSocket端点服务类
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-05-21
*/
@Slf4j
@ServerEndpoint("/websocket/{sid}")
@Component
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收sid
*/
private String sid = "";
/**
* onOpen 连接建立成功调用的方法
*
* @param session session
* @param sid sid
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
//加入set中
webSocketSet.add(this);
//在线数加1
addOnlineCount();
log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());
this.sid = sid;
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* onClose 连接关闭调用的方法
*/
@OnClose
public void onClose() {
//从set中删除
webSocketSet.remove(this);
//在线数减1
subOnlineCount();
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* onMessage 收到客户端消息后调用的方法
*
* @param message message
* @param session session
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
try {
// 消息对象转换
SocketMessage socketMessage = JSON.parseObject(message, SocketMessage.class);
if (StringUtils.isNotNull(socketMessage)) {
List<WebSocketServer> list = new ArrayList<>(webSocketSet);
// 检索接收方
WebSocketServer webSocketServer = list
.stream()
.filter(v -> v.sid.equals(socketMessage.getTo()))
.findFirst()
.orElse(null);
if (StringUtils.isNotNull(webSocketServer)) {
// 指定用户发送
webSocketServer.sendMessage(message);
}
}
} catch (IOException e) {
log.error("消息发送失败:{}", e.getMessage());
}
}
/**
* onError 发生错误
*
* @param session session
* @param error error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
}
/**
* sendMessage 实现服务器主动推送
*
* @param message message
* @throws IOException IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* sendInfo 群发自定义消息
*
* @param message message
* @param sid sid
* @throws IOException IOException
*/
public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {
log.info("推送消息到窗口" + sid + ",推送内容:" + message);
try {
for (WebSocketServer item : webSocketSet) {
//这里可以设定只推送给这个sid的,为null则全部推送
if (sid == null) {
item.sendMessage(message);
} else if (item.sid.equals(sid)) {
item.sendMessage(message);
}
}
} catch (IOException e) {
log.error("消息发送失败:{}", e.getMessage());
}
}
/**
* 获取在线人数
*
* @return 返回结果
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 上线人数计数器
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* 下线人数计数器
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
创建 WebSocket 组件
在前端 Vue
项目中自定义创建 WebSocket
组件。
js
<template>
<div></div>
</template>
<script setup lang="ts" name="global-websocket">
import { reactive, ref, computed, onMounted, onUnmounted } from 'vue';
import { useUserStore } from '@/store/modules/user';
const emit = defineEmits(['rollback']);
/**
* 定义接收的参数
*/
const props = defineProps({
uri: {
type: String,
},
});
/**
* 定义Socket状态
*/
const state = reactive({
webSocket: ref(), // webSocket实例
lockReconnect: false, // 重连锁,避免多次重连
maxReconnect: 6, // 最大重连次数, -1 标识无限重连
reconnectTime: 0, // 重连尝试次数
heartbeat: {
interval: 30 * 1000, // 心跳间隔时间
timeout: 10 * 1000, // 响应超时时间
pingTimeoutObj: ref(), // 延时发送心跳的定时器
pongTimeoutObj: ref(), // 接收心跳响应的定时器
pingMessage: JSON.stringify({ type: 'ping' }), // 心跳请求信息
},
});
/**
* 获取Token令牌
*/
const token = computed(() => {
return useUserStore().getToken;
});
const tenant = computed(() => {
return Session.getTenant();
});
/**
* 钩子函数
*/
onMounted(() => {
initWebSocket();
});
onUnmounted(() => {
state.webSocket.close();
clearTimeoutObj(state.heartbeat);
});
/**
* 初始化WebSocket对象
*/
const initWebSocket = () => {
// ws地址
let host = window.location.host;
let wsUri = `${location.protocol === 'https:' ? 'wss' : 'ws'}://${host}${props.uri}`;
// 建立连接
state.webSocket = new WebSocket(wsUri);
// 连接成功
state.webSocket.onopen = onOpen;
// 连接错误
state.webSocket.onerror = onError;
// 接收信息
state.webSocket.onmessage = onMessage;
// 连接关闭
state.webSocket.onclose = onClose;
};
/**
* 重连机制
*/
const reconnect = () => {
if (!token.value) {
return;
}
if (
state.lockReconnect ||
(state.maxReconnect !== -1 && state.reconnectTime > state.maxReconnect)
) {
return;
}
state.lockReconnect = true;
setTimeout(() => {
state.reconnectTime++;
// 建立新连接
initWebSocket();
state.lockReconnect = false;
}, 5000);
};
/**
* 清空定时器
*/
const clearTimeoutObj = (heartbeat: any) => {
heartbeat.pingTimeoutObj && clearTimeout(heartbeat.pingTimeoutObj);
heartbeat.pongTimeoutObj && clearTimeout(heartbeat.pongTimeoutObj);
};
/**
* 开启心跳
*/
const startHeartbeat = () => {
const webSocket = state.webSocket;
const heartbeat = state.heartbeat;
// 清空定时器
clearTimeoutObj(heartbeat);
// 延时发送下一次心跳
heartbeat.pingTimeoutObj = setTimeout(() => {
// 如果连接正常
if (webSocket.readyState === 1) {
// 这里发送一个心跳,后端收到后,返回一个心跳消息,
webSocket.send(heartbeat.pingMessage);
// 心跳发送后,如果服务器超时未响应则断开,如果响应了会被重置心跳定时器
heartbeat.pongTimeoutObj = setTimeout(() => {
webSocket.close();
}, heartbeat.timeout);
} else {
// 否则重连
reconnect();
}
}, heartbeat.interval);
};
/**
* 连接成功事件
*/
const onOpen = () => {
//开启心跳
startHeartbeat();
state.reconnectTime = 0;
};
/**
* 连接失败事件
* @param e
*/
const onError = () => {
//重连
reconnect();
};
/**
* 连接关闭事件
* @param e
*/
const onClose = () => {
//重连
reconnect();
};
/**
* 接收服务器推送的信息
* @param msgEvent 消息事件
*/
const onMessage = (msgEvent: any) => {
// 收到服务器信息,心跳重置并发送
startHeartbeat();
// 回调函数
emit('rollback', msgEvent.data);
};
</script>
使用 WebSocket 组件
js
<template>
<!-- 全局WebSocket通讯组件 -->
<global-websocket :uri="'/api/websocket/' + userInfo.id" @rollback="rollback" />
</template>
<script lang="ts" setup>
/**
* 定义参数
*/
const GlobalWebsocket = defineAsyncComponent(() => import('@/components/Websocket/index.vue'));
</script>
<style lang="scss" scoped>
</style>
WebSocket连接测试
使用 ApiFox
调试工具,输入请求地址:
js
ws://127.0.0.1:8081/api/websocket/1
总结
通过以上实现方案,可以快速集成 WebSocket
功能,适用于实时聊天、通知推送、实时数据更新等场景。如果需要支持大规模并发,可以结合外部消息代理(如 RabbitMQ
)使用。