springboot实现websocket客户端,含重连机制

2023-02-17 19:55:16

一、简介

  • 因为用前端实现的客户端,比方说小程序,网络不稳定,会经常断,所以考虑用java实现客户端,稳定。
  • java版的重连机制确实花费了好多时间才正好。
  • 重连的时候刚开始没有加同步,导致定时器发心跳频繁的时候上次还没有完全创建完就又创建了一个客户端,加同步避免了。
  • sendMsg的时候之前没有加超时,可能有同时存在多个建立连接占用资源的隐患,加了超时。额 此处限制被我在生产环境去掉了,因为这个时间不好控制,短了的话会一直连不上。。。暂时再考虑这块有没有必要处理。
  • 还有websocket比较恶心的是 每次error的时候都开一个新线程去通知,如果你一直失败,就等着cpu爆炸吧。
  • WebSocket有五种状态:NOT_YET_CONNECTED、CONNECTING、OPEN、CLOSING、CLOSED, 只有not_yet_connected的时候才可以connect, 一旦connect之后,状态改变了,就无法再connect了。

  • 代码的git地址为:https://github.com/1956025812/websocketdemo

  • 找时间要自己写个服务端,含踢人和一段时间没收到响应就踢人的效果;

二、实现

2.1 pom依赖

        <!-- websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.3.5</version>
        </dependency>
        <!-- websocket -->

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>

2.2 启动类

package com.example.websocketdemo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@Slf4j
@SpringBootApplication
@EnableScheduling
public class WebsocketdemoApplication implements ApplicationRunner {

    @Autowired
    private WebSocketClientFactory webSocketClientFactory;

    @Override
    public void run(ApplicationArguments args) {
        // 项目启动的时候打开websocket连接
        webSocketClientFactory.retryOutCallWebSocketClient();
    }

    public static void main(String[] args) {
        SpringApplication.run(WebsocketdemoApplication.class, args);
    }


}

2.3 WebSocketClientFactory

 

package com.example.websocketdemo;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.net.URISyntaxException;

@Component
@Slf4j
@Data
public class WebSocketClientFactory {

    public static final String outCallWebSockertUrl = "ws://IP:端口";

    private WebSocketClient outCallWebSocketClientHolder;


    /**
     * 创建websocket对象
     *
     * @return WebSocketClient
     * @throws URISyntaxException
     */
    private WebSocketClient createNewWebSocketClient() throws URISyntaxException {
        WebSocketClient webSocketClient = new WebSocketClient(new URI(outCallWebSockertUrl)) {
            @Override
            public void onOpen(ServerHandshake serverHandshake) {
            }

            @Override
            public void onMessage(String msg) {
                log.info("接收信息为:{}", msg);
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                log.info("关闭连接");
                retryOutCallWebSocketClient();
            }

            @Override
            public void onError(Exception e) {
                log.error("连接异常");
                retryOutCallWebSocketClient();
            }
        };
        webSocketClient.connect();
        return webSocketClient;
    }


    /**
     * 项目启动或连接失败的时候打开新链接,进行连接认证
     * 需要加同步,不然会创建多个连接
     */
    public synchronized WebSocketClient retryOutCallWebSocketClient() {
        try {
            // 关闭旧的websocket连接, 避免占用资源
            WebSocketClient oldOutCallWebSocketClientHolder = this.getOutCallWebSocketClientHolder();
            if (null != oldOutCallWebSocketClientHolder) {
                log.info("关闭旧的websocket连接");
                oldOutCallWebSocketClientHolder.close();
            }

            log.info("打开新的websocket连接,并进行认证");
            WebSocketClient webSocketClient = this.createNewWebSocketClient();
            String sendOpenJsonStr = "{\"event\":\"connect\",\"sid\":\"1ae4e3167b3b49c7bfc6b79awww691562914214595\",\"token\":\"df59eba89\"}";
            this.sendMsg(webSocketClient, sendOpenJsonStr);

            // 每次创建新的就放进去
            this.setOutCallWebSocketClientHolder(webSocketClient);
            return webSocketClient;
        } catch (URISyntaxException e) {
            e.printStackTrace();
            log.error(e.getMessage());
        }
        return null;
    }


    /**
     * 发送消息
     * 注意: 要加超时设置,避免很多个都在同时超时占用资源
     *
     * @param webSocketClient 指定的webSocketClient
     * @param message         消息
     */
    public void sendMsg(WebSocketClient webSocketClient, String message) {
        log.info("websocket向服务端发送消息,消息为:{}", message);
        long startOpenTimeMillis = System.currentTimeMillis();
        while (!webSocketClient.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
            log.debug("正在建立通道,请稍等");
            long currentTimeMillis = System.currentTimeMillis();
            if(currentTimeMillis - startOpenTimeMillis >= 5000) {
                log.error("超过5秒钟还未打开连接,超时,不再等待");
                return;
            }
        }
        webSocketClient.send(message);
    }



    @Async
    @Scheduled(fixedRate = 10000)
    public void sendHeartBeat() {
        log.info("定时发送websocket心跳");
        try {
            WebSocketClient outCallWebSocketClientHolder = this.getOutCallWebSocketClientHolder();

            if (null == outCallWebSocketClientHolder) {
                log.info("当前连接还未建立,暂不发送心跳消息");
                return;
            }

            // 心跳的请求串,根据服务端来定
            String heartBeatMsg = "{\"event\":\"heartbeat\",\"sid\":\"1ae4e3167b3b49c7bfc6b79a74f2296915222214595\"}";
            this.sendMsg(outCallWebSocketClientHolder, heartBeatMsg);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发送心跳异常");
            retryOutCallWebSocketClient();
        }
    }

}

 

三、测试

3.0 控制类

package com.example.websocketdemo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class OutCallWebSocketController {

    @Autowired
    private WebSocketClientFactory webSocketClientFactory;

    @GetMapping("/sendCall")
    public void sendCall() {
        String heartBeatMsg = "{\"event\":\"heartbeat\",\"sid\":\"1ae4e3167b3b49c7bfc6b79a74f229691562914214595\"}";
        webSocketClientFactory.sendMsg(webSocketClientFactory.getOutCallWebSocketClientHolder(), heartBeatMsg);
    }

}

3.1 日志

3.1.1 启动的日志

2020-01-02 21:56:43.365  INFO 14800 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-01-02 21:56:43.377  INFO 14800 --- [   scheduling-1] c.e.w.WebSocketClientFactory             : 定时发送websocket心跳
2020-01-02 21:56:43.377  INFO 14800 --- [   scheduling-1] c.e.w.WebSocketClientFactory             : 当前连接还未建立,暂不发送心跳消息
2020-01-02 21:56:43.391  INFO 14800 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8000 (http) with context path ''
2020-01-02 21:56:43.393  INFO 14800 --- [           main] c.e.w.WebsocketdemoApplication           : Started WebsocketdemoApplication in 1.208 seconds (JVM running for 1.617)
2020-01-02 21:56:43.394  INFO 14800 --- [           main] c.e.w.WebSocketClientFactory             : 打开新的websocket连接,并进行认证
2020-01-02 21:56:43.398  INFO 14800 --- [           main] c.e.w.WebSocketClientFactory             : websocket向服务端发送消息,消息为:{"event":"connect","sid":"1ae4e3167b3b49c7bfc6b79awww691562914214595","token":"df59eba89"}
2020-01-02 21:56:43.410  INFO 14800 --- [       Thread-5] c.e.w.WebSocketClientFactory             : 接收信息为:{"result":2,"reqBody":{"event":"connect","sid":"1ae4e3167b3b49c7bfc6b79awww691562914214595","token":"df59eba89"},"event":"connect","resultMessage":"企业用户不存在","sid":"1ae4e3167b3b49c7bfc6b79awww691562914214595"}
2020-01-02 21:56:53.377  INFO 14800 --- [   scheduling-1] c.e.w.WebSocketClientFactory             : 定时发送websocket心跳
2020-01-02 21:56:53.377  INFO 14800 --- [   scheduling-1] c.e.w.WebSocketClientFactory             : websocket向服务端发送消息,消息为:{"event":"heartbeat","sid":"1ae4e3167b3b49c7bfc6b79a74f2296915222214595"}
Disconnected from the target VM, address: '127.0.0.1:55203', transport: 'socket'
2020-01-02 21:56:55.993  INFO 14800 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2020-01-02 21:56:55.993  INFO 14800 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

3.1.2 测试日志 TODO

 

 

  • 作者:请叫我猿叔叔
  • 原文链接:https://blog.csdn.net/qq_35206261/article/details/103761773
    更新时间:2023-02-17 19:55:16