Netty框架之协议应用二(RPC开发实战之Dubbo)

2022年6月4日08:57:26

前言

netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享了redis客户端和websocket弹幕功能的简单实现,这次为大家带来相对比较高档的rpc框架底层网络通信,今天主要以dubbo为例,希望大家有所收获。

RPC

定义

RPC为远程服务调用,即客户端远程调用服务端的方法,然后服务端返回响应或异常。常用的RPC解决方案有JAVA RMI,webService,Http Invoker,Dubbo,SpringCloud等等。

去中心化架构

传统集中式架构:
下图是小编理解的中心化架构

Netty框架之协议应用二(RPC开发实战之Dubbo)

中心化架构其优点为:架构简单,客户端调用的时候可以跨语言,缺点的话所有的调用都会进过ngnix(这里就可以理解为中心,大家都得进过他,无论是调用还是返回响应),ngnix一旦挂了之后就会是服务挂了,当然如果ngnix部署集群也会让架构变的复杂。
去中心化架构
如下图:

Netty框架之协议应用二(RPC开发实战之Dubbo)
这里的话客户端调用服务端不需要进过中心直连调用。
去中心化架构简单描述后,继续来看一下rpc框架组成。

框架组成

Netty框架之协议应用二(RPC开发实战之Dubbo)
这个架构是不是似曾相识,这里如果看过小编的dubbo分享就觉得面熟了。
上面最小化实现rpc框架就是最下面的rpc协议就可以了,这样两个服务就可以通信即可。接着咱们来介绍一下rpc协议。

协议报文

这里小编直接用dubbo协议来说明报文:

Netty框架之协议应用二(RPC开发实战之Dubbo)
上面请求头主要有16个字节,如果看过小编的前基本应用的使用后,看到这个就可以使用netty来进行消息的拆包以及编解码。那小编接下来继续说明编解码的过程。

概设过程

这边在写代码之前,小编先分享一下设计的思路,以及一些调用的逻辑。
首先是编解码:编解码占网络传输中必须且固定的,先看下图:

Netty框架之协议应用二(RPC开发实战之Dubbo)
具体已经在上图解释清楚了,接下来看器调用过程即各个组件功能。

Netty框架之协议应用二(RPC开发实战之Dubbo)
上图是比较简单的,接下来是至关重要的的从客户端到服务整个流程的调用过程

Netty框架之协议应用二(RPC开发实战之Dubbo)
容小编解释一下:

  • 从客户端到服务端的调用涉及到了四个线程,分别是客户端以及服务端的业务线程和IO线程
  • 发起掉用写入消息体的内容是上面的Transfer,而编码request则为客户端发起的请求。而Transfer中的Request包含了接口,方法以及参数
  • 写入到socket中的为bytebuf,其经过Bytebuf -> head -> unsafe -> nio socket (doWrite) -> java nio channel -> socket
  • 写入到socket则到达服务端,服务端的io线程通过多路复用选择器select轮询,之后调用read
  • 读取到内容后,这边的读取流程和上面相似 unsafe read -> pipeline fireChannelRead,拿到了ByteBuf,然后解码request,根据上面的解码工具类
  • 从ByteBuf拿到Transfer,之后交给业务的handler,之后涉及到服务端业务线程的处理,业务处理后返回了结果或者报错信息(这里同上其实是Transfer)
  • 之后又交还给io线程,再次将Response进行编码(服务端的响应),Bytebuf写到socket
  • 回到客户端io线程后,再次有select进行轮询,读取到内容Bytebuf,解码成Transfer,Transfer中的response进行反序列化拿到结果填充回执,
  • 客户端拿到回执,释放等待。

注意事项
第一:加入客户端A和B的请求,客户端怎样拿到服务端回来的A响应和B相应呢,这里就需要Transfer里面的id,即协议中的id,不过如果是协议中的id,那就需要做请求的时候放入到一个map中来保存。
第二:既然知道使用id来区分请求响应,那什么时候放入到map中, 怎么保证线程安全,那最好是线程安全的map,不过高并发的时候,对系统很不友好,所以放入到map的时候也在io线程中执行。
第三:如何在io线程放入map中,这里是用eventloop的submit,写入消息完成后监听并写入map

讲完理论小编不是纯粹的理论派,还是代码实战派

代码实战

编解码工具以及传输类

Transfer类

publicclassTransfer{publicstaticfinalbyte STATUS_ERROR=0;publicstaticfinalbyte STATUS_OK=1;publicstaticfinalbyte STATUS_ILLEGAL=2;publicstaticfinalbyte SERIALIZABLE_JAVA=1;publicstaticfinalbyte SERIALIZABLE_HESSIAN2=2;publicstaticfinalbyte SERIALIZABLE_JSON=3;boolean request;byte serializableId;// 1:java 2:hessian2 3:jsonboolean twoWay;boolean heartbeat;long id;byte status;// 1正常 0失败 2请求非法Object target;publicTransfer(long id){this.id= id;}}

编解码工具类

publicclassRpcCodecextendsByteToMessageCodec{privatestaticfinalint HEADER_LENGTH=16;privatestaticfinalshort MAGIC=0xdad;privatestaticfinalByteBuf MAGIC_BUF=Unpooled.copyShort(MAGIC);privatestaticfinalbyte FLAG_REQUEST=(byte)0x80;//1000 0000privatestaticfinalbyte FLAG_TWO_WAY=(byte)0x40;//0100 0000privatestaticfinalbyte FLAG_EVENT=(byte)0x20;//0010 0000privatestaticfinalint SERIALIZATION_MASK=0x1f;//0001 1111// 编码@Overrideprotectedvoidencode(ChannelHandlerContext ctx,Object msg,ByteBuf out){if(msginstanceofTransfer){doEncode((Transfer) msg, out);}else{thrownewIllegalArgumentException();}}//解码@Overrideprotectedvoiddecode(ChannelHandlerContext ctx,ByteBuf in,List out){Transfer transfer=doDecode(in);if(transfer!=null){
            out.add(transfer);}}// 编码protectedvoiddoEncode(Transfer data,ByteBuf buf){byte[] header=newbyte[HEADER_LENGTH];Bytes.short2bytes(MAGIC, header);

        header[2]= data.serializableId;if(data.request) header[2]|= FLAG_REQUEST;if(data.twoWay) header[2]|= FLAG_TWO_WAY;if(data.heartbeat) header[2]|= FLAG_EVENT;if(!data.request) header[3]= data.status;Bytes.long2bytes(data.id, header,4);// id 占8个字节int len=0;byte[] body=newbyte[0];if(!data.heartbeat){
            body=serialize(data.serializableId, data.target);
            len= body.length;}Bytes.int2bytes(len, header,12);
        buf.writeBytes(header);
        buf.writeBytes(body);}// 解码protectedTransferdoDecode(ByteBuf in){int index=ByteBufUtil.indexOf(MAGIC_BUF, in);//是否有魔数if(index<0){returnnull;}//消息头是否完整if(!in.isReadable(index+ HEADER_LENGTH)){returnnull;}byte[] header=newbyte[HEADER_LENGTH];//      in.getBytes(index, header);ByteBuf slice= in.slice();
        slice.readBytes(header);int length=Bytes.bytes2int(header,12);//消息体是否完整if(!in.isReadable(index+ HEADER_LENGTH+ length)){returnnull;//需要更多的字节}Transfer transfer=newTransfer(Bytes.bytes2long(header,4));
        transfer.heartbeat=(header[2]& FLAG_EVENT)!=0;
        transfer.request=(header[2]& FLAG_REQUEST)!=0;
        transfer.twoWay=(header[2]& FLAG_TWO_WAY)!=0;
        transfer.serializableId=(byte)(header[2]& SERIALIZATION_MASK);
        transfer.status= header[3];if(!transfer.heartbeat){byte[] content=newbyte[length];//          in.getBytes(index + HEADER_LENGTH, bytes);
            slice.readBytes(content);
            transfer.target=deserialize(transfer.serializableId, content);}//跳过已经读取的
        in.skipBytes(index+ HEADER_LENGTH+ length);return transfer;}// 序列化privatebyte[]serialize(byte serializableId,Object target){if(serializableId==Transfer.SERIALIZABLE_JAVA){//JAVAByteArrayOutputStream out;try{
                out=newByteArrayOutputStream();ObjectOutputStream stream=newObjectOutputStream(out);
                stream.writeObject(target);}catch(IOException e){thrownewRuntimeException(e);}return out.toByteArray();}else{thrownewUnsupportedOperationException();}}// 反序列化privateObjectdeserialize(byte serializableId,byte[] bytes){if(serializableId==Transfer.SERIALIZABLE_JAVA){//JAVAtry{ObjectInputStream stream=newObjectInputStream(newByteArrayInputStream(bytes));return stream.readObject();}catch(IOException|ClassNotFoundException e){thrownewRuntimeException(e);}}else{thrownewUnsupportedOperationException();}}}

客户端代码

publicclassRpcClient{staticAtomicLong atomicLong=newAtomicLong(100);privateChannel channel;privateMap<Long,Promise<Response>> results=newHashMap<>();publicstaticlonggetNextId(){return atomicLong.getAndIncrement();}publicvoidinit(String address,int port)throwsInterruptedException{Bootstrap bootstrap=newBootstrap();
        bootstrap.group(newNioEventLoopGroup(1)).channel(NioSocketChannel.class);
        bootstrap.handler(newChannelInitializer<Channel>(){@OverrideprotectedvoidinitChannel(Channel ch){
                ch.pipeline().addLast("codec",newRpcCodec());
                ch.pipeline().addLast("resultSet",newResultFill());// 结果集填充}});ChannelFuture connect= bootstrap.connect(address, port);
        channel= connect.sync().channel();System.out.println("连接成功");//// 每隔 两秒发送心跳
        channel.eventLoop().scheduleWithFixedDelay(()->{Transfer transfer=newTransfer(getNextId());
            transfer.heartbeat=true;
            channel.writeAndFlush(transfer);},2000,2000,TimeUnit.MILLISECONDS);}publicResponseinvokerRemote(Class serverInterface,String methodDesc,Object[] args)throwsInterruptedException,ExecutionException,TimeoutException{Request request=newRequest(serverInterface.getName(), methodDesc);
        request.setArgs(args);Transfer transfer=newTransfer(getNextId());
        transfer.request=true;
        transfer.serializableId=Transfer.SERIALIZABLE_JAVA;
        transfer.target= request;DefaultPromise<Response> resultPromise=newDefaultPromise(channel.eventLoop());// 写入成功后添加 结果
        channel.writeAndFlush(transfer).addListener(future->{// IO线程if(future.cause()!=null){// 写入失败
                        resultPromise.setFailure(future.cause());//写入失败必须处理}else{// 写入成功
                        results.put(transfer.id, resultPromise);}});return resultPromise.get(10000,TimeUnit.MILLISECONDS);}privateclassResultFillextendsSimpleChannelInboundHandler<Transfer>{@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Transfer msg){if(msg.heartbeat){System.out.println(String.format("服务端心跳返回:%s",
                        ctx.channel().remoteAddress()));}else{Promise<Response> promise= results.remove(msg.id);
                promise.setSuccess((Response) msg.target);// 填充结果}}}public<T>TgetRemoteService(Class<T> serviceInterface){assert serviceInterface.isInterface();Object o=Proxy.newProxyInstance(getClass().getClassLoader(),newClass[]{serviceInterface},newInvocationHandler(){@OverridepublicObjectinvoke(Object proxy,Method method,Object[] args)throwsException{if(Object.class.equals(method.getDeclaringClass())){return method.invoke(this, args);}String methodDescriptor= method.getName()+Type.getMethodDescriptor(method);Response response=invokerRemote(serviceInterface, methodDescriptor, args);if(response.getError()!=null){thrownewRuntimeException("远程服务调用异常:", response.getError());}return response.getResult();}});return(T) o;}}

服务端代码:

publicclassRpcServer{ExecutorService threadPool=Executors.newFixedThreadPool(500);privateMap<String,ServiceBean> register=newHashMap<>();publicvoidstart(int port)throwsInterruptedException{ServerBootstrap bootstrap=newServerBootstrap();EventLoopGroup boss=newNioEventLoopGroup(1);EventLoopGroup work=newNioEventLoopGroup(8);
        bootstrap.group(boss, work).channel
  • 作者:木兮君
  • 原文链接:https://blog.csdn.net/a1032722788/article/details/119765271
    更新时间:2022年6月4日08:57:26 ,共 7972 字。