准备
后端引入pom依赖
<dependency><groupId>org.springframework</groupId><artifactId>spring-websocket</artifactId></dependency>
前端引入websocket
npminstall --save reconnecting-websocket
在项目结构中的增加域名地址
VUE_APP_DOMAIN='192.168.0.199:38080'
后端服务搭建
WebSocketServer
@ServerEndpoint(value="/webSocket/{userId}",encoders={WebSocketEncoder.class})@Component@DatapublicclassWebSocketServer{/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/privatestaticAtomicInteger onlineNum=newAtomicInteger();/**
* concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
*/privatestaticConcurrentHashMap<String,Session> sessionPools=newConcurrentHashMap<>();/**
* 发送消息
* @param msgType
* @param session
* @param message
* @throws IOException
* @throws EncodeException
*/publicstaticvoidsendMessage(String msgType,Session session,Object message)throwsIOException,EncodeException{if(session!=null){synchronized(session){if(messageinstanceofString){
session.getBasicRemote().sendObject(Result.success(message).setType("str"));}else{
session.getBasicRemote().sendObject(Result.success(message).setType(msgType));}}}}/**
* 给指定用户发送信息
* @param userName
* @param message
*/publicstaticvoidsendInfo(String userName,String message){Session session= sessionPools.get(userName);try{sendMessage(null,session, message);}catch(Exception e){
e.printStackTrace();}}/**
* 建立连接成功调用
* @param session
* @param userId
*/@OnOpenpublicstaticvoidonOpen(Session session,@PathParam(value="userId")String userId){try{if(ObjectUtil.isNull(userId)){sendMessage(null,session,null);}else{
sessionPools.put(userId, session);addOnlineCount();System.out.println(RedisUtil.getUserName(userId)+"加入webSocket!当前人数为"+ onlineNum);sendMessage(null,session,"欢迎"+RedisUtil.getUserName(userId)+"加入连接!");}}catch(IOException|EncodeException e){
e.printStackTrace();}}/**
* 关闭连接时调用
* @param userId
*/@OnClosepublicstaticvoidonClose(@PathParam(value="userId")String userId){
sessionPools.remove(userId);subOnlineCount();System.out.println(RedisUtil.getUserName(userId)+"断开webSocket连接!当前人数为"+ onlineNum);}/**
* 在线消息
* @param message
* @throws IOException
*/@OnMessagepublicstaticvoidonMessage(String message)throwsIOException{System.out.println(message);
sessionPools.values().forEach(v->{try{sendMessage(null,v, message);}catch(Exception e){
e.printStackTrace();}});}/**
* 群发
* @param message
* @param userIds
*/publicstaticvoidsendByUserIds(String message,List<String> userIds){
userIds.forEach(userId->{try{sendMessage(null, sessionPools.get(userId), message);}catch(IOException|EncodeException e){
e.printStackTrace();}});}/**
* 单发
* @param msgType
* @param obj
* @param userId
*/publicstaticvoidsendByUserId(String msgType,Object obj,Long userId){try{sendMessage(msgType, sessionPools.get(String.valueOf(userId)), obj);}catch(IOException|EncodeException e){
e.printStackTrace();}}/**
* 错误时调用
* @param session
* @param throwable
*/@OnErrorpublicstaticvoidonError(Session session,Throwable throwable){System.out.println("发生错误");
throwable.printStackTrace();}publicstaticvoidaddOnlineCount(){
onlineNum.incrementAndGet();}publicstaticvoidsubOnlineCount(){
onlineNum.decrementAndGet();}}
WebSocketConfig
@ConfigurationpublicclassWebSocketConfig{@Bean(name="serverEndpointExporter")publicServerEndpointExportergetServerEndpointExporterBean(){returnnewServerEndpointExporter();}}
WebSocketEncoder
publicclassWebSocketEncoderimplementsEncoder.Text<Result>{@OverridepublicStringencode(Result object)throwsEncodeException{if(object.getData()instanceofString){returnJSONUtil.toJsonStr(object);}else{BeanWrapper bw=newBeanWrapperImpl(object.getData());PropertyDescriptor[] pd= bw.getPropertyDescriptors();JSONObject jsonObject=newJSONObject();Arrays.stream(pd).filter(f->ObjectUtil.isNotEmpty(bw.getPropertyValue(f.getName()))).forEach(v->{
jsonObject.putIfAbsent(v.getName(),bw.getPropertyValue(v.getName())+"");});JSONObject parseObj=JSONUtil.parseObj(object.getData());
parseObj.putAll(jsonObject);//将java对象转换为json字符串returnJSONUtil.toJsonStr(object.setData(parseObj));}}@Overridepublicvoidinit(EndpointConfig endpointConfig){}@Overridepublicvoiddestroy(){}}
Result
@Data@Accessors(chain=true)publicclassResult{/**
* 返回状态码 0 正常,-1 错误
*/Integer code;/**
* 返回信息
*/String msg;/**
* 返回数据
*/Object data;/**
* 类型
*/String type;/**
* 请求成功返回
* @author 骆超
* @date 2021/5/17
* @param data 参数说明
* @return com.test.activitidemo.config.Result
*/publicstaticResultsuccess(Object data){returnnewResult().setCode(0).setMsg("请求成功").setData(data);}/**
* 请求失败返回
* @author 骆超
* @date 2021/5/17
* @param msg 参数说明
* @return com.test.activitidemo.config.Result
*/publicstaticResultfailed(String msg){returnnewResult().setCode(-1).setMsg(msg);}/**
* 自定义设置返回信息
* @author 骆超
* @date 2021/5/17
* @return com.test.activitidemo.config.Result
*/publicstaticResultget(){returnnewResult();}}
前端webscoket.js
import ReconnectingWebSocketfrom'reconnecting-websocket';import storefrom'@/store'import{Notice}from'iview'import{seenMessage,getMessageData}from"@/api/data";import routerfrom"@/router";exportdefault{// 创建 websocket 链接createWebsocket(userId){const httpURL= process.env.VUE_APP_DOMAIN;this.websocket=newReconnectingWebSocket(`ws://`+ httpURL+`/webSocket/`+ userId);// 连接发生错误的回调方法this.websocket.onerror=this.websocketOnerror;// 连接成功建立的回调方法this.websocket.onopen=this.websocketOnopen;// 接收到消息的回调方法this.websocket.onmessage=this.websocketOnmessage;// 连接关闭的回调方法this.websocket.onclose=this.websocketOnclose;// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。this.websocket.onbeforeunload=this.websocketOnbeforeunload;},// 连接发生错误的回调方法websocketOnerror(){
console.log('连接发生错误的回调方法');},// 连接成功建立的回调方法websocketOnopen(){},// 接收到消息的回调方法websocketOnmessage(event){let obj=JSON.parse(event.data)switch(obj.type){case"msg":
store.commit('setMsgData', obj.data)
Notice.config({duration:10//消息提示停留10秒})
Notice.info({title: obj.data.title,name:obj.data.id,desc: obj.data.content,render:h=>{returnh('span',{style:{cursor:"pointer"},on:{click:()=>{switch(obj.data.msgType){case1:case"1":
router.push({name:'activiti-tasks'})break;default:
console.log(obj.data)break}seenMessage({id:obj.data.id}).then(res=>{
store.commit('setMsgData',[])getMessageData({}).then(res=>{if(res.code==='0'){
store.commit('setMsgData', res.data)
Notice.close(obj.data.id)}})})}}},obj.data.content)}});break;default:
console.log(obj.data)}},// 连接关闭的回调方法websocketOnclose(event){
console.log("连接已关闭")},// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常websocketOnbeforeunload(){this.closeWebSocket();
console.log('监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常');},// 关闭WebSocket连接closeWebSocket(){this.websocket.close();},}
前端在初次登录的时候,调用createWebsocket传入用户id,与后端建立连接,后端在调用的时候调用WebSocketServer 中的sendByUserId方法即可