Netty集成ProtoBuf开发私有协议

2022年6月4日09:58:56

Netty集成ProtoBuf开发私有协议

私有协议

广义上区分,通信协议可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或者组织内部使用,按需定制,也因为如此,升级起来会非常方便,灵活性好。绝大多数的私有协议传输层都基于TCP/IP,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。

通信模型

Netty集成ProtoBuf开发私有协议

(1) Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
(2) Netty 协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息:
(3)链路建立成功之后,客户端发送业务消息;
(4)链路成功之后,服务端发送心跳消息;
(5)链路建立成功之后,客户端发送心跳消息;
(6)链路建立成功之后,服务端发送业务消息;
(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。

ProtoBuf数据格式

syntax = "proto3";
option java_package = "com.fy.protobuf";
option java_outer_classname="CustomMessageData";

message MessageData{
    int64 length = 1;
    Content content = 2;
    enum DataType {
        REQ_LOGIN = 0;  //上线登录验证环节 等基础信息上报
        RSP_LOGIN = 1;  //返回上线登录状态与基础信息
        PING = 2;  //心跳
        PONG = 3;  //心跳
        REQ_ACT = 4;  //动作请求
        RSP_ACT = 5;  //动作响应
        REQ_CMD = 6;  //指令请求
        RSP_CMD = 7;  //指令响应
        REQ_LOG = 8 ;//日志请求
        RSP_LOG = 9;  //日志响应
    }
    DataType order = 3;
    message Content{
        int64 contentLength = 1;
        string data = 2;
    }
}

开发步骤

tip????下列步骤有点吃力的小伙伴可以看看之前的文章:https://blog.csdn.net/kunfeisang5551/article/details/107957256

1、在D盘protobuf路径下执行命令:protoc.exe --java_out=D:\protobuf CustomMsg.proto

2、将生成的文件拷贝到项目中

开始Coding~

1、新建maven项目,引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.11.0</version>
</dependency>

2、创建服务端启动代码

public class CustomServer {
    public void bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //消息头定长
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    //解码指定的消息类型
                                    .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))
                                    //消息头设置长度
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    //解码
                                    .addLast(new ProtobufEncoder())
                                    //心跳检测,超过设置的时间将会抛出异常ReadTimeoutException
                                    .addLast(new ReadTimeoutHandler(8))
                                    //消息处理
                                    .addLast(new CustomServerHandler())
                                    //心跳响应
                                    .addLast(new CustomServerHeartBeatHandler());
                        }
                    });
            // 绑定端口同步等待启动成功
            ChannelFuture sync = bootstrap.bind(port).sync();

            // 等待服务监听端口关闭
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

3、创建服务端消息处理代码

public class CustomServerHandler extends ChannelInboundHandlerAdapter {

    private String[] whiteIPv4List = {"127.0.0.1", "192.168.1.188"};
    public static ConcurrentHashMap nodeCheck = new ConcurrentHashMap();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.UNRECOGNIZED) {
            // 无法识别的消息类型
            ctx.close();
        }

        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.REQ_LOGIN) {
            // 检查重复登录
            String nodeIndex = ctx.channel().remoteAddress().toString();
            if (nodeCheck.contains(nodeIndex)) {
                // 重复登录
                ctx.writeAndFlush(builderResp(false));
                return;
            } else {
                InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = socketAddress.getAddress().getHostAddress();
                boolean isOk = false;
                // 检查白名单
                for (String s : whiteIPv4List) {
                    if (s.equals(ip)) {
                        isOk = true;
                        break;
                    }
                }
                // 成功响应
                CustomMessageData.MessageData responseData = isOk ? builderResp(true) : builderResp(false);
                if (isOk) {
                    nodeCheck.put(nodeIndex, true);
                }
                ctx.writeAndFlush(responseData);
            }
        } else {
            //心跳消息处理
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        nodeCheck.remove(ctx.channel().remoteAddress().toString());
        if (ctx.channel().isActive()) {
            ctx.close();
        }
    }

    public CustomMessageData.MessageData builderResp(boolean isOk) {
        String r = isOk ? "SUCCESS" : "FAILED";
        CustomMessageData.MessageData.Content responseContent = CustomMessageData.MessageData.Content.newBuilder().setData(r).setContentLength(r.length()).build();
        CustomMessageData.MessageData responseData = CustomMessageData.MessageData.newBuilder().setOrder(CustomMessageData.MessageData.DataType.RSP_LOGIN).setContent(responseContent).build();
        return responseData;
    }
}

4、创建服务端心跳响应代码

public class CustomServerHeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PING) {
            CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()
                    .setOrder(CustomMessageData.MessageData.DataType.PONG).build();
            System.out.println("Send-Client:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            ctx.writeAndFlush(req);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
}

5、创建客户端启动代码

public class CustomClient {
    public void bind(int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
                                    // 消息处理
                                    .addLast(new CustomClientHandler())
                                    // 心跳响应
                                    .addLast(new CustomClientHeartBeatHandler());
                        }
                    });
            ChannelFuture f = b.connect("127.0.0.1", port).sync();

            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 短线重连 定时5秒
            group.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(5);
                    bind(port);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
//            group.shutdownGracefully();
        }
    }
}

6、创建客户端消息处理代码

这里的逻辑主要是通道激活后马上发送业务消息,然后保持心跳

public class CustomClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CustomMessageData.MessageData reqData = CustomMessageData
                .MessageData
                .newBuilder()
                .setOrder(CustomMessageData.MessageData.DataType.REQ_LOGIN)
                .build();
        ctx.channel().writeAndFlush(reqData);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData respData = (CustomMessageData.MessageData) msg;
        if (respData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {
            // 响应登录请求处理逻辑
            boolean equals = respData.getContent().getData().equals("SUCCESS");
            if (equals) {
                System.out.println("Receive-Server:LoginSuccess,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                System.out.println(respData.toString());
                // 传递下一个handler
                ctx.fireChannelRead(msg);
            } else {
                // 登录失败
                if (ctx.channel().isActive()) {
                    ctx.close();
                }
            }
        } else {
            // 响应心跳处理逻辑
            ctx.fireChannelRead(msg);
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        if (ctx.channel().isActive()) {
            ctx.close();
        }
    }
}

7、创建客户端心跳保持代码

public class CustomClientHeartBeatHandler extends ChannelInboundHandlerAdapter {

    private static ScheduledFuture heartbeatFuture;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {
            // 登录成功后保持心跳 间隔为5秒
            heartbeatFuture = ctx.executor().scheduleAtFixedRate(() -> {
                CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()
                        .setOrder(CustomMessageData.MessageData.DataType.PING).build();
                System.out.println("Send-Server:PING,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                ctx.writeAndFlush(req);
            }, 0, 5, TimeUnit.SECONDS);
        } else if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PONG) {
            System.out.println("Receive-Server:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            System.out.println();
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 发生异常就取消心跳保持
        if (heartbeatFuture != null) {
            heartbeatFuture.cancel(true);
            heartbeatFuture = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

8、启动服务端

public class Server {
    public static void main(String[] args) throws Exception {
        new CustomServer().bind(8080);
    }
}

9、启动客户端

public class Client {
    public static void main(String[] args) {
        new CustomClient().bind(8080);
    }
}

控制台打印

1、客户端

Netty集成ProtoBuf开发私有协议

Receive-Server:LoginSuccess,time:2020-08-12 17:31:47
content {
  contentLength: 7
  data: "SUCCESS"
}
order: RSP_LOGIN

Send-Server:PING,time:2020-08-12 17:31:47
Receive-Server:PONG,time:2020-08-12 17:31:47

Send-Server:PING,time:2020-08-12 17:31:52
Receive-Server:PONG,time:2020-08-12 17:31:52

Send-Server:PING,time:2020-08-12 17:31:57
Receive-Server:PONG,time:2020-08-12 17:31:57

Send-Server:PING,time:2020-08-12 17:32:02
Receive-Server:PONG,time:2020-08-12 17:32:02

我们可以看到,当客户端发送登录请求后,服务端响应登录成功消息,然后交替打印心跳保持信息,间隔为5秒。

2、服务端

Netty集成ProtoBuf开发私有协议

Send-Client:PONG,time:2020-08-12 17:31:47
Send-Client:PONG,time:2020-08-12 17:31:52
Send-Client:PONG,time:2020-08-12 17:31:57
Send-Client:PONG,time:2020-08-12 17:32:02
Send-Client:PONG,time:2020-08-12 17:32:07

服务端响应登录请求后交替打印心跳保持信息。

3、测试服务端异常

我们先停掉服务端,看看客户端有啥反应,客户端日志:

Connection refused: no further information

客户端5秒打印一次异常信息,说明短线重连逻辑正常

我们接着再启动服务端,看看客户端有啥反应

Netty集成ProtoBuf开发私有协议

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Receive-Server:LoginSuccess,time:2020-08-12 17:44:15
content {
  contentLength: 7
  data: "SUCCESS"
}
order: RSP_LOGIN

Send-Server:PING,time:2020-08-12 17:44:15
Receive-Server:PONG,time:2020-08-12 17:44:15

Send-Server:PING,time:2020-08-12 17:44:20
Receive-Server:PONG,time:2020-08-12 17:44:20

可以看到由异常转为正常啦~

通过测试可以验证是否符合私有协议的约定:

(1)客户端是否能够正常发起重连:
(2)重连成功之后,不再重连:
(3)断连期间,心跳定时器停止工作,不再发送心跳请求消息;
(4)服务端重启成功之后,允许客户端重新登录;
(5)服务端重启成功之后,客户端能够重连和握手成功:
(6)重连成功之后,双方的心跳能够正常互发。
(7)性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏。

GitHub服务端地址:https://github.com/GoodBoy2333/netty-server-maven.git

GitHub客户端地址:https://github.com/GoodBoy2333/netty-client-maven.git

  • 作者:fangyan2333
  • 原文链接:https://blog.csdn.net/kunfeisang5551/article/details/107963222
    更新时间:2022年6月4日09:58:56 ,共 10806 字。