SpringBoot + Vue3 + Websocket的一个简单实践

服务端: Maven导入依赖 首先在pom.xml文件中导入websocket的依赖项 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>sprin

服务端:

Maven导入依赖

首先在pom.xml文件中导入websocket的依赖项

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

导入fastjson工具,用于JSON格式转换

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

配置类WebSocketConfig

ServerEndpointExporter是Spring框架中的一个类,主要用于在Spring Boot应用中启动和管理WebSocket服务器端点。

在Spring Boot内置容器(嵌入式容器)中运行时,必须由ServerEndpointExporter提供ServerEndpointExporter bean,它会在启动时自动扫描和注册应用中的WebSocket端点。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@Component
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

编写WebSocket操作类

WebSocket操作类定义了WebSocket中的onOpen()、onMessage()、onClose()、onError()等方法,同时提供了一个发送广播(全部订阅用户)和点对点信息的方法。

限定多播本质仍为广播,这里通过携带header的方式让对应前端进行指定接收,当前端检测header不对应时则不使用收到的消息,在某种意义上实现多播效果。

@Component
@Slf4j
@ServerEndpoint("/api/websocket/{userId}")
public class WebSocket {
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    //用户ID
    private String userId;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
    private static CopyOnWriteArraySet<WebSocket> webSockets =new CopyOnWriteArraySet<>();
    // 用来存在线连接用户信息
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String,Session>();

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value="userId")String userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("WebSocket消息有新的连接:"+userId+",总数为:"+webSockets.size());
        } catch (Exception e) {
            log.error("WebSocket异常-链接失败(onOpen),原因:" + e.getMessage());
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            log.info("WebSocket消息连接断开,总数为:"+webSockets.size());
        } catch (Exception e) {
            log.error("WebSocket异常-链接关闭失败(onClose),原因:" + e.getMessage());
        }
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("WebSocket消息收到客户端消息:"+message);
    }

    /** 发送错误时的处理
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误,原因:"+error.getMessage());
        log.error("WebSocket异常-错误信息(onError),原因:" + error.getMessage());
    }


    // 此为广播消息
    public void sendAllMessage(String message) {
        log.info("WebSocket消息-广播消息:"+message);
        for(WebSocket webSocket : webSockets) {
            try {
                if(webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                log.error("WebSocket异常-广播消息异常(sendAllMessage),原因:" + e.getMessage());
            }
        }
    }

    // 此为单点消息
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("WebSocket消息-点对点消息:"+message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                log.error("WebSocket异常-点对点消息异常(sendOneMessage),原因:" + e.getMessage());
            }
        }
    }

    //此为限定多播消息
    public void sendMultiMessage(String header,Object data) {
        for(WebSocket webSocket : webSockets) {
            Message message = new Message(header,data);
            try {
                log.info("WebSocket消息-组播消息:"+header);
                if(webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
                }
            } catch (Exception e) {
                log.error("WebSocket异常-组播消息异常(sendAllMessage),原因:" + e.getMessage());
            }
        }
    }

}

message类

message类包含一个header和需要发送的内容

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String header;
    private Object data;
}

测试类WebSocketController

接下来编写一个测试类用于测试向前端页面发送消息

@RestController
@RequestMapping("/ws/msg")
public class WebSocketController {
    @Resource
    private WebSocket webSocket;
    @Resource
    private NoticeMapper noticeMapper;
    @RequestMapping("/all") // 全局消息推送测试
    public void all() {
        //创建业务消息信息
//        JSONObject obj = new JSONObject();
//        obj.put("msg","这是发给所有人的消息");
//        //全体发送
//        webSocket.sendAllMessage(obj.toString());
        Notifications notice = noticeMapper.getById(1);
        Result result = Result.success(notice);
        webSocket.sendAllMessage(JSON.toJSONString(result));
    }
    @RequestMapping("/{userId}/{msg}") // 指定用户消息推送测试
    public void sendUser(@PathVariable String userId,@PathVariable String msg) {
        //创建业务消息信息
        JSONObject obj = new JSONObject();
        obj.put("msg",msg);
        webSocket.sendOneMessage(userId, JSON.toJSONString(obj));
    }
    @RequestMapping("/{userId}")//指定用户推送结构体测试
    public void sendStruct(@PathVariable String userId){
        Notifications notice = noticeMapper.getById(1);
        webSocket.sendOneMessage(userId,JSON.toJSONString(notice));
    }
    @RequestMapping("/multi") // 组播消息推送测试
    public void multi() {
        //创建业务消息信息
        Notifications notice = noticeMapper.getById(1);
        webSocket.sendMultiMessage("notice",notice);
    }
}

vue3环境搭建

初始化WebSocket

其中JSON.parse用于JSON格式转换,if(result.value.header == null)用于判断是否为多播消息,演示代码的状况是不接收组播消息

<script setup>

// websocket 相关逻辑
import {ElMessage} from "element-plus";

const message = ref('')
const websock = ref(null)

// 初始化 WebSocket
import {onMounted, onUnmounted, ref} from "vue";

const msg = ref('')
const initWebSocket = () => {
  //ws地址
  const url = "ws://localhost:8080/api/websocket/1"
  websock.value = new WebSocket(url)

  websock.value.onopen = websocketonopen
  websock.value.onerror = websocketonerror
  websock.value.onmessage = (e) => {
    //服务器主动更新数据
    const result = ref()
    result.value = JSON.parse(e.data)
    if(result.value.header == null){
      msg.value = JSON.parse(e.data)
      ElMessage.success("接收到消息");
    }
  }
  websock.value.onclose = websocketclose
}

const websocketonopen = () => {
  console.log("WebSocket连接成功")
}

const websocketonerror = (e) => {
  console.log("WebSocket连接发生错误", e)
}

const websocketclose = (e) => {
  console.log("connection closed", e.code)
}

onMounted(() => {
  initWebSocket()
})

onUnmounted(() => {
  if (websock.value) {
    websock.value.close()
  }
})
</script>

<template>
  <p>
    接收到的信息:{{msg}}
  </p>
</template>

<style scoped>
</style>

LICENSED UNDER CC BY-NC-SA 4.0
Comment