Skip to content

引言

WebSocket 是一种全双工通信协议,允许客户端和服务器之间建立持久连接,实现实时数据传输。项目中集成 WebSocket 可以用于实现实时聊天、通知推送、实时数据更新等场景。

与传统的 HTTP 请求-响应模式相比,WebSocket 具有以下好处和优势:

1. 实时性
    WebSocket 支持服务器主动向客户端推送数据,适用于实时聊天、通知推送、实时数据更新等场景。
    避免了 HTTP 轮询带来的延迟和资源浪费。
2. 高效性
    WebSocket 建立连接后,客户端和服务器可以通过一个持久连接进行双向通信,减少了连接建立和关闭的开销。
    数据以帧的形式传输,减少了协议头部的开销。
3. 减少带宽消耗
    与 HTTP 轮询相比,WebSocket 只需要建立一次连接,后续通信无需重复发送 HTTP 头部信息,节省了带宽。
4. 支持全双工通信
    WebSocket 允许客户端和服务器同时发送和接收数据,适合需要双向交互的场景。
5. 跨平台支持
    WebSocket 协议被现代浏览器广泛支持,同时也可以通过客户端库在移动端和桌面端使用。
6. 易于集成
    Spring Boot 提供了对 WebSocket 的全面支持,结合 STOMP 协议可以快速实现消息的发布和订阅

添加依赖

pom.xml 中添加 WebSocket 依赖:

js
<!-- WebSocket依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

xiaomayi-common/xiaomayi-websocket 模块中已经引入此依赖,在实际使用时直接引入以下依赖即可:

js
<!-- WebSocket依赖模块 -->
<dependency>
    <groupId>com.xiaomayi</groupId>
    <artifactId>xiaomayi-websocket</artifactId>
</dependency>

配置 WebSocket

创建 WebSocket 通讯配置文件 WebSocketConfig 并开启服务支持。

js
package com.xiaomayi.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * <p>
 * WebSocket配置类,开启WebSocket支持
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-05-21
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {

    /**
     * 注入Bean对象
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     *
     * @return 返回结果
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

温馨提示

上述配置文件中 @EnableWebSocket 表示开启 WebSocket 支持。

WebSocket端点服务

js
package com.xiaomayi.websocket.server;

import com.alibaba.fastjson2.JSON;
import com.xiaomayi.core.utils.StringUtils;
import com.xiaomayi.websocket.dto.SocketMessage;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * <p>
 * WebSocket端点服务类
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-05-21
 */
@Slf4j
@ServerEndpoint("/websocket/{sid}")
@Component
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * 接收sid
     */
    private String sid = "";

    /**
     * onOpen 连接建立成功调用的方法
     *
     * @param session session
     * @param sid     sid
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid) {
        this.session = session;
        //加入set中
        webSocketSet.add(this);
        //在线数加1
        addOnlineCount();
        log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());
        this.sid = sid;
        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            log.error("websocket IO异常");
        }
    }

    /**
     * onClose 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        //从set中删除
        webSocketSet.remove(this);
        //在线数减1
        subOnlineCount();
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * onMessage 收到客户端消息后调用的方法
     *
     * @param message message
     * @param session session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来自窗口" + sid + "的信息:" + message);
        try {
            // 消息对象转换
            SocketMessage socketMessage = JSON.parseObject(message, SocketMessage.class);
            if (StringUtils.isNotNull(socketMessage)) {
                List<WebSocketServer> list = new ArrayList<>(webSocketSet);
                // 检索接收方
                WebSocketServer webSocketServer = list
                        .stream()
                        .filter(v -> v.sid.equals(socketMessage.getTo()))
                        .findFirst()
                        .orElse(null);
                if (StringUtils.isNotNull(webSocketServer)) {
                    // 指定用户发送
                    webSocketServer.sendMessage(message);
                }
            }
        } catch (IOException e) {
            log.error("消息发送失败:{}", e.getMessage());
        }
    }

    /**
     * onError 发生错误
     *
     * @param session session
     * @param error   error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
    }

    /**
     * sendMessage 实现服务器主动推送
     *
     * @param message message
     * @throws IOException IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * sendInfo 群发自定义消息
     *
     * @param message message
     * @param sid     sid
     * @throws IOException IOException
     */
    public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {
        log.info("推送消息到窗口" + sid + ",推送内容:" + message);
        try {
            for (WebSocketServer item : webSocketSet) {
                //这里可以设定只推送给这个sid的,为null则全部推送
                if (sid == null) {
                    item.sendMessage(message);
                } else if (item.sid.equals(sid)) {
                    item.sendMessage(message);
                }
            }
        } catch (IOException e) {
            log.error("消息发送失败:{}", e.getMessage());
        }
    }

    /**
     * 获取在线人数
     *
     * @return 返回结果
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 上线人数计数器
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * 下线人数计数器
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}

创建 WebSocket 组件

在前端 Vue 项目中自定义创建 WebSocket 组件。

js
<template>
  <div></div>
</template>
<script setup lang="ts" name="global-websocket">
  import { reactive, ref, computed, onMounted, onUnmounted } from 'vue';
  import { useUserStore } from '@/store/modules/user';

  const emit = defineEmits(['rollback']);

  /**
   * 定义接收的参数
   */
  const props = defineProps({
    uri: {
      type: String,
    },
  });

  /**
   * 定义Socket状态
   */
  const state = reactive({
    webSocket: ref(), // webSocket实例
    lockReconnect: false, // 重连锁,避免多次重连
    maxReconnect: 6, // 最大重连次数, -1 标识无限重连
    reconnectTime: 0, // 重连尝试次数
    heartbeat: {
      interval: 30 * 1000, // 心跳间隔时间
      timeout: 10 * 1000, // 响应超时时间
      pingTimeoutObj: ref(), // 延时发送心跳的定时器
      pongTimeoutObj: ref(), // 接收心跳响应的定时器
      pingMessage: JSON.stringify({ type: 'ping' }), // 心跳请求信息
    },
  });

  /**
   * 获取Token令牌
   */
  const token = computed(() => {
    return useUserStore().getToken;
  });

  const tenant = computed(() => {
    return Session.getTenant();
  });

  /**
   * 钩子函数
   */
  onMounted(() => {
    initWebSocket();
  });

  onUnmounted(() => {
    state.webSocket.close();
    clearTimeoutObj(state.heartbeat);
  });

  /**
   * 初始化WebSocket对象
   */
  const initWebSocket = () => {
    // ws地址
    let host = window.location.host;
    let wsUri = `${location.protocol === 'https:' ? 'wss' : 'ws'}://${host}${props.uri}`;

    // 建立连接
    state.webSocket = new WebSocket(wsUri);
    // 连接成功
    state.webSocket.onopen = onOpen;
    // 连接错误
    state.webSocket.onerror = onError;
    // 接收信息
    state.webSocket.onmessage = onMessage;
    // 连接关闭
    state.webSocket.onclose = onClose;
  };

  /**
   * 重连机制
   */
  const reconnect = () => {
    if (!token.value) {
      return;
    }
    if (
      state.lockReconnect ||
      (state.maxReconnect !== -1 && state.reconnectTime > state.maxReconnect)
    ) {
      return;
    }
    state.lockReconnect = true;
    setTimeout(() => {
      state.reconnectTime++;
      // 建立新连接
      initWebSocket();
      state.lockReconnect = false;
    }, 5000);
  };

  /**
   * 清空定时器
   */
  const clearTimeoutObj = (heartbeat: any) => {
    heartbeat.pingTimeoutObj && clearTimeout(heartbeat.pingTimeoutObj);
    heartbeat.pongTimeoutObj && clearTimeout(heartbeat.pongTimeoutObj);
  };

  /**
   * 开启心跳
   */
  const startHeartbeat = () => {
    const webSocket = state.webSocket;
    const heartbeat = state.heartbeat;
    // 清空定时器
    clearTimeoutObj(heartbeat);
    // 延时发送下一次心跳
    heartbeat.pingTimeoutObj = setTimeout(() => {
      // 如果连接正常
      if (webSocket.readyState === 1) {
        // 这里发送一个心跳,后端收到后,返回一个心跳消息,
        webSocket.send(heartbeat.pingMessage);
        // 心跳发送后,如果服务器超时未响应则断开,如果响应了会被重置心跳定时器
        heartbeat.pongTimeoutObj = setTimeout(() => {
          webSocket.close();
        }, heartbeat.timeout);
      } else {
        // 否则重连
        reconnect();
      }
    }, heartbeat.interval);
  };

  /**
   * 连接成功事件
   */
  const onOpen = () => {
    //开启心跳
    startHeartbeat();
    state.reconnectTime = 0;
  };

  /**
   * 连接失败事件
   * @param e
   */
  const onError = () => {
    //重连
    reconnect();
  };

  /**
   * 连接关闭事件
   * @param e
   */
  const onClose = () => {
    //重连
    reconnect();
  };

  /**
   * 接收服务器推送的信息
   * @param msgEvent 消息事件
   */
  const onMessage = (msgEvent: any) => {
    // 收到服务器信息,心跳重置并发送
    startHeartbeat();
    // 回调函数
    emit('rollback', msgEvent.data);
  };
</script>

使用 WebSocket 组件

js
<template>
  <!-- 全局WebSocket通讯组件 -->
  <global-websocket :uri="'/api/websocket/' + userInfo.id" @rollback="rollback" />
</template>

<script lang="ts" setup>
  /**
   * 定义参数
   */
  const GlobalWebsocket = defineAsyncComponent(() => import('@/components/Websocket/index.vue'));
  
</script>

<style lang="scss" scoped>
</style>

WebSocket连接测试

使用 ApiFox 调试工具,输入请求地址:

js
ws://127.0.0.1:8081/api/websocket/1

总结

通过以上实现方案,可以快速集成 WebSocket 功能,适用于实时聊天、通知推送、实时数据更新等场景。如果需要支持大规模并发,可以结合外部消息代理(如 RabbitMQ)使用。

小蚂蚁云团队 · 提供技术支持