Dubbo源码学习系列整合网络框架Netty详解

2022年6月4日09:29:57

前言

           上篇文章写到了利用zookeeper的特性实现缓存服务地址列表,接下来我们可以借助Netty的优点对程序进行改造,使其即支持Http容器的tomcat,又支持Dubbo协议的Netty, 实际上Dubbo也是支持两种协议,一种是Dubbo协议,一种是Http, 其中dubbo协议就是借助Netty实现的。

Netty客户端与服务端交互流程

         1)  Netty客户端通过IP和端口绑定,准备好JSON数据包。

         2)  Netty  client 将要发送的数据包通过一系列的encoder, 将数据加密,然后发送给socket,由socket 发送给服务端。

         3)  Netty server 接收到Netty client 发送过来的数据后,然后通过一系列的解码器,将数据进行解码,解码完毕后,可以自定义serverHandler处理响应逻辑,最后将响应的数据通过一系列的编码器编码,然后发送到socket中,最终发生给客户端。

         4)  Netty client 收到服务端发过来的消息后,又先会通过一系列的解码器将消息解码,然后将解码后的消息转交给clientHandler处理,处理完后,再将消息加密,通过socket返回给服务端。

1. 写一个NettyServer

       在写NettyServer之前,我们需要先去学习Netty相关的网络基础知识,然后再去写Server。

       NettyServer的启动流程如下:

       1) 绑定ip,监听端口。

       2)  创建一个Nio事件组,该事件组是IO多路复用的体现。

       3)  NioServerSocketChannel  注册selector事件处理器, 准备接收连接。

       4)  初始化Channel信道, 数据是通过Channel来传输的。

       5)  使用serverHandler 处理业务逻辑。

package com.luban.protocol.dubbo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyServer {

    public void start(String hostName, int port) {
        try {
            final ServerBootstrap bootstrap = new ServerBootstrap();
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            bootstrap.group(eventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers
                                    .weakCachingConcurrentResolver(this.getClass()
                                            .getClassLoader())));
                            pipeline.addLast("encoder", new ObjectEncoder());
                            pipeline.addLast("handler", new NettyServerHandler());
                        }
                    });
            bootstrap.bind(hostName, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {

        NettyServer server = new NettyServer();
        server.start("127.0.0.1", 8001);
    }


}

 注: encoder和decoder的位置可以互换,但是都要写在handler前面,否则在调用的时候会出现有一端有TimeoutException

启动NettyServer:

启动成功后,接着写NettyClient!

2. 写一个NettyClient

        1) NettyClient和NettyServer的连接方式是TCP, TCP连接的建立是比较耗时的一个操作,因此最好使用多线程技术的线程池去管理多连接,刚开始初始化的时候会要点时间,连接用完了会归还到线程池里,然后再次使用只需要从线程池去选择一个连接使用即可。

        2) 我们可以选择使用CachedThreadPool,CachedThreadPool线程池是可以伸缩的,它会根据需要创建新的线程,然后再重用之前可使用的已创建的线程

        首先看一下一个关键的接口ChannelHandler,提供信道初始化的实现。

     ChannelHandler

        它有2个关键的实现类ChannelInboundHandlerAdapterChannelOutBoundHandlerAdapter,他们都可以向Pipeline大管道里读写数据, 区别是一个进入,一个是出去,ChannelInBoundHandlerAdapter在下文会用到。

ChannelInboundHandlerAdapter

        方法一:   channelActive(ChannelHandlerContext ctx);  在管道激活的时候初始化ChannelHandlerContext, 可用ChannelHandlerContext的wirteAndFlush()方法去向Pipeline写入数据, 实现ChannelInboundHandlerAdapter抽象类是必须重写此方法,要不然无法使用ctx去写数据。

        方法二:   chnnelRead(ChannelHandlerContext ctx, Object msg);   这个方法用来读取pipeline管道流过来的数据, msg为数据包。

     ChannelOutBoundHandlerAdapter

       主要方法:

        方法一:  bind(ChannelHandlerContext ctx, SocketAddress socketAddress, ChannelPromise channelPromise);   绑定IP, 监听端口。

 另外他们都实现了Callable接口, 此接口里只有一个方法call() , 因此我们可以在线程执行时,将客户端发送消息的逻辑放到call()方法里。

package com.example.dubbo.protocol.dubbo;

import com.example.dubbo.framework.Invocation;
import com.example.dubbo.framework.URL;
import com.example.dubbo.protocol.http.HttpClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {

    public NettyClientHandler client = null;

    private URL url;

    // 使用线程池开启线程
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public NettyClientHandler start() {
        client = new NettyClientHandler();

        final Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                        pipeline.addLast("encoder", new ObjectEncoder());
                        pipeline.addLast("handler", client);
                    }
                });

        // 连接socket
        try {
            bootstrap.connect(url.getHost(), url.getPort()).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return client;
    }

    // 通过线程来发送信息给Server
    public Object send(URL url, Invocation invocation) {
        this.url = url;
        if (client == null) {
            client = start();
        }
        client.setInvocation(invocation);
        try {
            Object obj = executorService.submit(client).get();
            return obj;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void main(String[] args) {
        //测试nettyClient往nettyServer发送message
        Invocation invocation = new Invocation("com.example.dubbo.provider.api.UserInterface", "sayHello", new Object[]{"bingbing"}, new Class[]{String.class});
        NettyClient client = new NettyClient();
        URL url = new URL("localhost", 8001);
        client.send(url, invocation);
    }


}

3.  用NettyClient测试NettyServer

 public static void main(String[] args) {
        //测试nettyClient往nettyServer发送message
        Invocation invocation = new Invocation("com.example.dubbo.provider.api.UserInterface", "sayHello", new Object[]{"bingbing"}, new Class[]{String.class});
        NettyClient client = new NettyClient();
        URL url = new URL("localhost", 8001);
        client.send(url, invocation);
    }

  消费方:

服务方:

注:

       1. 根据如上consumer控制台打印结果发现我在服务端已经将执行结果回写到了pipeline里,但消费方读取到的结果为null! 是什么原因呢?

        经过一番分析后,原因是result还没来得及被赋值就给返回出去了。发现是因为在执行call()方法时,当前线程没有等待result赋值后就返回了, 所以返回的result为null。

解决方法

       在发送消息后,用wait()阻塞一下,等待ChannelRead()读取到服务端发送过来的数据赋值给Result, 然后再返回result。


    @Override
    public Object call() throws Exception {
        // 在Call 方法执行逻辑
        System.out.println("向服务端发送消息...");
        ctx.writeAndFlush(invocation);
        wait();
        return result;
    }

   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //读取消息
        this.result = msg;
        notify();
    }

    2. 重新启动consumer, consumer控制台报错:

查阅官网相关资料后,发现出现此错误的原因主要包含了以下几种情况:

当前线程不含有对象锁资源的时候去调用了wait()方法。

当前线程不含有对象锁资源的时候去调用了notify()方法。

当前线程不含有对象锁资源的时候去调用了notifyAll()方法。

      2.  在高并发情况下,如果有2个线程同时进入到call()方法里,会不会出现上述的报错问题?

          答案是肯定的,有可能会存在如下情况,上一个线程拿到对象资源锁执行wait(),但它还没执行完,下一个线程就又进入到call()方法去拿wait(),没有拿到资源锁就wait(), 所以报错。

解决方法

        加锁,在call()方法和channelRead()方法上添加synchronized关键字。

重写启动后,观察结果!

程序调通了后,就可以把netty客户端和服务端整合到之前的项目里。

4. 整合netty

     consumer:

provider:

     启动nettyServer

 重新启动观察结果:

借助Netty实现远程调用成功,至此,整合Netty完毕!

5.  failed to create a child event loop 报错问题

        稳定运行一段时间后,发现开始出现问题了,问题如下:

   Exception in thread "main" java.lang.IllegalStateException: failed to create a child event loop
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:68)
    at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:49)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:61)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:52)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:44)
    at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:36)
    at com.example.dubbo.protocol.dubbo.NettyClient.start(NettyClient.java:36)
    at com.example.dubbo.protocol.dubbo.NettyClient.send(NettyClient.java:62)
    at com.example.dubbo.framework.ProxyFactory$1.invoke(ProxyFactory.java:40)
    at org.springframework.cglib.proxy.Proxy$ProxyImpl$$EnhancerByCGLIB$$2c24aae9.sayHello(<generated>)
    at com.example.dubbo.consumer.ConsumerApplication.main(ConsumerApplication.java:25)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
    at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:128)
    at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:120)
    at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:87)
    at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:64)
    ... 10 more
Caused by: java.io.IOException: Unable to establish loopback connection
    at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
    at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:171)
    at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
    at java.nio.channels.Pipe.open(Pipe.java:155)
    at sun.nio.ch.WindowsSelectorImpl.<init>(WindowsSelectorImpl.java:127)
    at sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
    at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:126)
    ... 13 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): connect
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
    at sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
    at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
    ... 21 more


发现最终的问题是由于no buffer space available (maximum connections reached?): connect

用netstat -ano 命令查看连接情况, 发现有大量的9090端口的连接没有被释放。

发现是生成了大量的TCP连接。

使用jvisualvm查看内存使用情况, 发现内存几乎用满。

在资源有限的情况下CashedThreadPool 有可能会创建大量的线程来不及回收,默认回收时间是60s, 最终导致OOM。

可以通过Semaphore 进行加锁,相当于限流的作用。

源代码:dubbo-rpc: 手写dubbo仓库源码

  • 作者:Dream_it_possible!
  • 原文链接:https://bingbing.blog.csdn.net/article/details/116355589
    更新时间:2022年6月4日09:29:57 ,共 9749 字。