springboot+vue+websocket实现前后端即时通知

2022-06-20 12:39:44

准备

后端引入pom依赖

<dependency><groupId>org.springframework</groupId><artifactId>spring-websocket</artifactId></dependency>

前端引入websocket

npminstall --save reconnecting-websocket

在项目结构中的增加域名地址
在这里插入图片描述

VUE_APP_DOMAIN='192.168.0.199:38080'

后端服务搭建

WebSocketServer

@ServerEndpoint(value="/webSocket/{userId}",encoders={WebSocketEncoder.class})@Component@DatapublicclassWebSocketServer{/**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */privatestaticAtomicInteger onlineNum=newAtomicInteger();/**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
     */privatestaticConcurrentHashMap<String,Session> sessionPools=newConcurrentHashMap<>();/**
     * 发送消息
     * @param msgType
     * @param session
     * @param message
     * @throws IOException
     * @throws EncodeException
     */publicstaticvoidsendMessage(String msgType,Session session,Object message)throwsIOException,EncodeException{if(session!=null){synchronized(session){if(messageinstanceofString){
                    session.getBasicRemote().sendObject(Result.success(message).setType("str"));}else{
                    session.getBasicRemote().sendObject(Result.success(message).setType(msgType));}}}}/**
     * 给指定用户发送信息
     * @param userName
     * @param message
     */publicstaticvoidsendInfo(String userName,String message){Session session= sessionPools.get(userName);try{sendMessage(null,session, message);}catch(Exception e){
            e.printStackTrace();}}/**
     * 建立连接成功调用
     * @param session
     * @param userId
     */@OnOpenpublicstaticvoidonOpen(Session session,@PathParam(value="userId")String userId){try{if(ObjectUtil.isNull(userId)){sendMessage(null,session,null);}else{
                sessionPools.put(userId, session);addOnlineCount();System.out.println(RedisUtil.getUserName(userId)+"加入webSocket!当前人数为"+ onlineNum);sendMessage(null,session,"欢迎"+RedisUtil.getUserName(userId)+"加入连接!");}}catch(IOException|EncodeException e){
            e.printStackTrace();}}/**
     * 关闭连接时调用
     * @param userId
     */@OnClosepublicstaticvoidonClose(@PathParam(value="userId")String userId){
        sessionPools.remove(userId);subOnlineCount();System.out.println(RedisUtil.getUserName(userId)+"断开webSocket连接!当前人数为"+ onlineNum);}/**
     * 在线消息
     * @param message
     * @throws IOException
     */@OnMessagepublicstaticvoidonMessage(String message)throwsIOException{System.out.println(message);
        sessionPools.values().forEach(v->{try{sendMessage(null,v, message);}catch(Exception e){
                e.printStackTrace();}});}/**
     * 群发
     * @param message
     * @param userIds
     */publicstaticvoidsendByUserIds(String message,List<String> userIds){
        userIds.forEach(userId->{try{sendMessage(null, sessionPools.get(userId), message);}catch(IOException|EncodeException e){
                e.printStackTrace();}});}/**
     * 单发
     * @param msgType
     * @param obj
     * @param userId
     */publicstaticvoidsendByUserId(String msgType,Object obj,Long userId){try{sendMessage(msgType, sessionPools.get(String.valueOf(userId)), obj);}catch(IOException|EncodeException e){
            e.printStackTrace();}}/**
     * 错误时调用
     * @param session
     * @param throwable
     */@OnErrorpublicstaticvoidonError(Session session,Throwable throwable){System.out.println("发生错误");
        throwable.printStackTrace();}publicstaticvoidaddOnlineCount(){
        onlineNum.incrementAndGet();}publicstaticvoidsubOnlineCount(){
        onlineNum.decrementAndGet();}}

WebSocketConfig

@ConfigurationpublicclassWebSocketConfig{@Bean(name="serverEndpointExporter")publicServerEndpointExportergetServerEndpointExporterBean(){returnnewServerEndpointExporter();}}

WebSocketEncoder

publicclassWebSocketEncoderimplementsEncoder.Text<Result>{@OverridepublicStringencode(Result object)throwsEncodeException{if(object.getData()instanceofString){returnJSONUtil.toJsonStr(object);}else{BeanWrapper bw=newBeanWrapperImpl(object.getData());PropertyDescriptor[] pd= bw.getPropertyDescriptors();JSONObject jsonObject=newJSONObject();Arrays.stream(pd).filter(f->ObjectUtil.isNotEmpty(bw.getPropertyValue(f.getName()))).forEach(v->{
               jsonObject.putIfAbsent(v.getName(),bw.getPropertyValue(v.getName())+"");});JSONObject parseObj=JSONUtil.parseObj(object.getData());
           parseObj.putAll(jsonObject);//将java对象转换为json字符串returnJSONUtil.toJsonStr(object.setData(parseObj));}}@Overridepublicvoidinit(EndpointConfig endpointConfig){}@Overridepublicvoiddestroy(){}}

Result

@Data@Accessors(chain=true)publicclassResult{/**
     * 返回状态码 0 正常,-1 错误
     */Integer code;/**
     * 返回信息
     */String msg;/**
     * 返回数据
     */Object data;/**
     * 类型
     */String type;/**
     * 请求成功返回
     * @author 骆超
     * @date 2021/5/17
     * @param data 参数说明
     * @return com.test.activitidemo.config.Result
     */publicstaticResultsuccess(Object data){returnnewResult().setCode(0).setMsg("请求成功").setData(data);}/**
     * 请求失败返回
     * @author 骆超
     * @date 2021/5/17
     * @param msg 参数说明
     * @return com.test.activitidemo.config.Result
     */publicstaticResultfailed(String msg){returnnewResult().setCode(-1).setMsg(msg);}/**
     * 自定义设置返回信息
     * @author 骆超
     * @date 2021/5/17
     * @return com.test.activitidemo.config.Result
     */publicstaticResultget(){returnnewResult();}}

前端webscoket.js

import ReconnectingWebSocketfrom'reconnecting-websocket';import storefrom'@/store'import{Notice}from'iview'import{seenMessage,getMessageData}from"@/api/data";import routerfrom"@/router";exportdefault{// 创建 websocket 链接createWebsocket(userId){const httpURL= process.env.VUE_APP_DOMAIN;this.websocket=newReconnectingWebSocket(`ws://`+ httpURL+`/webSocket/`+ userId);// 连接发生错误的回调方法this.websocket.onerror=this.websocketOnerror;// 连接成功建立的回调方法this.websocket.onopen=this.websocketOnopen;// 接收到消息的回调方法this.websocket.onmessage=this.websocketOnmessage;// 连接关闭的回调方法this.websocket.onclose=this.websocketOnclose;// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。this.websocket.onbeforeunload=this.websocketOnbeforeunload;},// 连接发生错误的回调方法websocketOnerror(){
    console.log('连接发生错误的回调方法');},// 连接成功建立的回调方法websocketOnopen(){},// 接收到消息的回调方法websocketOnmessage(event){let obj=JSON.parse(event.data)switch(obj.type){case"msg":
        store.commit('setMsgData', obj.data)
        Notice.config({duration:10//消息提示停留10秒})
        Notice.info({title: obj.data.title,name:obj.data.id,desc: obj.data.content,render:h=>{returnh('span',{style:{cursor:"pointer"},on:{click:()=>{switch(obj.data.msgType){case1:case"1":
                      router.push({name:'activiti-tasks'})break;default:
                      console.log(obj.data)break}seenMessage({id:obj.data.id}).then(res=>{
                    store.commit('setMsgData',[])getMessageData({}).then(res=>{if(res.code==='0'){
                        store.commit('setMsgData', res.data)
                        Notice.close(obj.data.id)}})})}}},obj.data.content)}});break;default:
        console.log(obj.data)}},// 连接关闭的回调方法websocketOnclose(event){
    console.log("连接已关闭")},// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常websocketOnbeforeunload(){this.closeWebSocket();
    console.log('监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常');},// 关闭WebSocket连接closeWebSocket(){this.websocket.close();},}

前端在初次登录的时候,调用createWebsocket传入用户id,与后端建立连接,后端在调用的时候调用WebSocketServer 中的sendByUserId方法即可

  • 作者:Ailurophile_Lc
  • 原文链接:https://blog.csdn.net/Rex9910/article/details/124820834
    更新时间:2022-06-20 12:39:44