服务端:
Maven导入依赖
首先在pom.xml文件中导入websocket的依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
导入fastjson工具,用于JSON格式转换
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
配置类WebSocketConfig
ServerEndpointExporter是Spring框架中的一个类,主要用于在Spring Boot应用中启动和管理WebSocket服务器端点。
在Spring Boot内置容器(嵌入式容器)中运行时,必须由ServerEndpointExporter提供ServerEndpointExporter bean,它会在启动时自动扫描和注册应用中的WebSocket端点。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@Component
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
编写WebSocket操作类
WebSocket操作类定义了WebSocket中的onOpen()、onMessage()、onClose()、onError()等方法,同时提供了一个发送广播(全部订阅用户)和点对点信息的方法。
限定多播本质仍为广播,这里通过携带header的方式让对应前端进行指定接收,当前端检测header不对应时则不使用收到的消息,在某种意义上实现多播效果。
@Component
@Slf4j
@ServerEndpoint("/api/websocket/{userId}")
public class WebSocket {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//用户ID
private String userId;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
private static CopyOnWriteArraySet<WebSocket> webSockets =new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String,Session>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value="userId")String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("WebSocket消息有新的连接:"+userId+",总数为:"+webSockets.size());
} catch (Exception e) {
log.error("WebSocket异常-链接失败(onOpen),原因:" + e.getMessage());
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("WebSocket消息连接断开,总数为:"+webSockets.size());
} catch (Exception e) {
log.error("WebSocket异常-链接关闭失败(onClose),原因:" + e.getMessage());
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("WebSocket消息收到客户端消息:"+message);
}
/** 发送错误时的处理
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:"+error.getMessage());
log.error("WebSocket异常-错误信息(onError),原因:" + error.getMessage());
}
// 此为广播消息
public void sendAllMessage(String message) {
log.info("WebSocket消息-广播消息:"+message);
for(WebSocket webSocket : webSockets) {
try {
if(webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
log.error("WebSocket异常-广播消息异常(sendAllMessage),原因:" + e.getMessage());
}
}
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("WebSocket消息-点对点消息:"+message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error("WebSocket异常-点对点消息异常(sendOneMessage),原因:" + e.getMessage());
}
}
}
//此为限定多播消息
public void sendMultiMessage(String header,Object data) {
for(WebSocket webSocket : webSockets) {
Message message = new Message(header,data);
try {
log.info("WebSocket消息-组播消息:"+header);
if(webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
}
} catch (Exception e) {
log.error("WebSocket异常-组播消息异常(sendAllMessage),原因:" + e.getMessage());
}
}
}
}
message类
message类包含一个header和需要发送的内容
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String header;
private Object data;
}
测试类WebSocketController
接下来编写一个测试类用于测试向前端页面发送消息
@RestController
@RequestMapping("/ws/msg")
public class WebSocketController {
@Resource
private WebSocket webSocket;
@Resource
private NoticeMapper noticeMapper;
@RequestMapping("/all") // 全局消息推送测试
public void all() {
//创建业务消息信息
// JSONObject obj = new JSONObject();
// obj.put("msg","这是发给所有人的消息");
// //全体发送
// webSocket.sendAllMessage(obj.toString());
Notifications notice = noticeMapper.getById(1);
Result result = Result.success(notice);
webSocket.sendAllMessage(JSON.toJSONString(result));
}
@RequestMapping("/{userId}/{msg}") // 指定用户消息推送测试
public void sendUser(@PathVariable String userId,@PathVariable String msg) {
//创建业务消息信息
JSONObject obj = new JSONObject();
obj.put("msg",msg);
webSocket.sendOneMessage(userId, JSON.toJSONString(obj));
}
@RequestMapping("/{userId}")//指定用户推送结构体测试
public void sendStruct(@PathVariable String userId){
Notifications notice = noticeMapper.getById(1);
webSocket.sendOneMessage(userId,JSON.toJSONString(notice));
}
@RequestMapping("/multi") // 组播消息推送测试
public void multi() {
//创建业务消息信息
Notifications notice = noticeMapper.getById(1);
webSocket.sendMultiMessage("notice",notice);
}
}
vue3环境搭建
初始化WebSocket
其中JSON.parse用于JSON格式转换,if(result.value.header == null)用于判断是否为多播消息,演示代码的状况是不接收组播消息
<script setup>
// websocket 相关逻辑
import {ElMessage} from "element-plus";
const message = ref('')
const websock = ref(null)
// 初始化 WebSocket
import {onMounted, onUnmounted, ref} from "vue";
const msg = ref('')
const initWebSocket = () => {
//ws地址
const url = "ws://localhost:8080/api/websocket/1"
websock.value = new WebSocket(url)
websock.value.onopen = websocketonopen
websock.value.onerror = websocketonerror
websock.value.onmessage = (e) => {
//服务器主动更新数据
const result = ref()
result.value = JSON.parse(e.data)
if(result.value.header == null){
msg.value = JSON.parse(e.data)
ElMessage.success("接收到消息");
}
}
websock.value.onclose = websocketclose
}
const websocketonopen = () => {
console.log("WebSocket连接成功")
}
const websocketonerror = (e) => {
console.log("WebSocket连接发生错误", e)
}
const websocketclose = (e) => {
console.log("connection closed", e.code)
}
onMounted(() => {
initWebSocket()
})
onUnmounted(() => {
if (websock.value) {
websock.value.close()
}
})
</script>
<template>
<p>
接收到的信息:{{msg}}
</p>
</template>
<style scoped>
</style>