Netty教程 解码器

2022-09-19 13:27:22

https://juejin.cn/post/6889632659979862029

ByteToMessageDecoder

看看ByteToMessageDecoder这个解码器的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        //存放解析出来的结果
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            first = cumulation == null;
            //累加字节流
            cumulation = cumulator.cumulate(ctx.alloc(),
                                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            //解码
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled();
                //向后传播ByteBuf
                fireChannelRead(ctx, out, size);
            } finally {
                out.recycle();
            }
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

主要过程分为3步

  • 累加字节流
  • 调用子类的decode方法进行解析
  • 将解析到的ByteBuf向后传播

累加字节流

这部分代码片段如下

//累加字节流
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
                                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);

通过一个累加器cumulator累加字节流

首先会判断是否是第一次累加,来决定是否要分配空的bytebuf

cumulatorCumulator类的实例,来看看它的cumulate方法

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
    if (!cumulation.isReadable() && in.isContiguous()) {
        ?/省略
    }
    try {
        final int required = in.readableBytes();
        if (required > cumulation.maxWritableBytes() ||
            (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
            cumulation.isReadOnly()) {
            //读入的数据大于剩余容量,进行扩容
            return expandCumulation(alloc, cumulation, in);
        }
        //读入数据
        cumulation.writeBytes(in, in.readerIndex(), required);
        //调整bytebuf的写索引
        in.readerIndex(in.writerIndex());
        return cumulation;
    } finally {
        //省略
    }
}

可以看到,就是通过一个bytebuf累加字节流,在适当的时候进行扩容

调用子类的decode方法进行解析

接下来看看callDecode方法

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            //记录解码前out的大小
            int outSize = out.size();

            if (outSize > 0) {
                //如果已经有解码结果,将其向后传播
                fireChannelRead(ctx, out, outSize);
                out.clear();
                if (ctx.isRemoved()) {
                    //如果节点被移除了,退出循环
                    break;
                }
                outSize = 0;
            }

            //记录一下decode前的可读字节数
            int oldInputLength = in.readableBytes();
            //调用子类的decode方法
            decodeRemovalReentryProtection(ctx, in, out);
            
            if (ctx.isRemoved()) {
                //如果节点被移除了,退出循环
                break;
            }

            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    //目前的字节数不足以解析出结果
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                //没有使用读入的字节,但是却解析出结果,抛出异常
                throw new DecoderException(
                    StringUtil.simpleClassName(getClass()) +
                    ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                //如果每次只解析一条message,退出循环
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}

继续看看decodeRemovalReentryProtection方法

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        //调用子类的decode方法
        decode(ctx, in, out);
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            //该节点正在等待被移除,向后传播结果然后移除该节点
            fireChannelRead(ctx, out, out.size());
            out.clear();
            handlerRemoved(ctx);
        }
    }
}

向后传播ByteBuf

来看看finally块里的fireChannelRead方法

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

可以看到,这里就是将解析出来的bytebuf一个个向后传播

FixedLengthFrameDecoder

FixedLengthFrameDecoder是基于固定长度(字节的长度)的解码器,它集成自ByteToMessageDecoder,在构造函数里传入frameLength表示长度

来看看它的decode方法

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

protected Object decode(
    @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
        return in.readRetainedSlice(frameLength);
    }
}

可以看到,解码的逻辑很简单

  • 如果当前累加器里可读的字节数小于frameLength,返回null,out里不会添加解析结果
  • 如果当前累加器里可读的字节数大于等于frameLength,那么读取frameLength个字节构建新的bytebuf,添加到out结果里

LineBasedFrameDecoder

LineBasedFrameDecoder是基于行的解码器

先来看看他的几个比较重要的成员变量

//能够解码的最大长度
private final int maxLength;
//超出最大长度后是否要立即抛出异常,默认为false
private final boolean failFast;
//解析出来的结果是否丢弃换行符,默认为true
private final boolean stripDelimiter;

接下来看看它的decode方法

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

同样,它的核心方法在另一个decode方法里

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    //寻找行换行符,如果使用'\r\n'换行,那么位置指向\r
    final int eol = findEndOfLine(buffer);
    if (!discarding) {
        //非丢弃模式
        if (eol >= 0) {
            //如果找到了行末
            final ByteBuf frame;
            //计算该行长度(不包括换行符长度)
            final int length = eol - buffer.readerIndex();
            //根据换行符长度
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

            if (length > maxLength) {
                //行长度超过最大长度,读入数据并且抛出异常
                buffer.readerIndex(eol + delimLength);
                fail(ctx, length);
                return null;
            }

            if (stripDelimiter) {
                //如果去掉换行符,
                frame = buffer.readRetainedSlice(length);
                buffer.skipBytes(delimLength);
            } else {
                frame = buffer.readRetainedSlice(length + delimLength);
            }

            //返回解析到的结果
            return frame;
        } else {
            //没找到换行符
            final int length = buffer.readableBytes();
            if (length > maxLength) {
                //超出最大长度,将读入的数据都丢弃,进入丢弃模式
                discardedBytes = length;
                buffer.readerIndex(buffer.writerIndex());
                discarding = true;
                offset = 0;
                if (failFast) {
                    fail(ctx, "over " + discardedBytes);
                }
            }
            return null;
        }
    } else {
        if (eol >= 0) {
            //丢弃模式下,读取到了换行符,退出丢弃模式
            final int length = discardedBytes + eol - buffer.readerIndex();
            final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            buffer.readerIndex(eol + delimLength);
            discardedBytes = 0;
            discarding = false;
            if (!failFast) {
                fail(ctx, length);
            }
        } else {
            //丢弃模式下,没有读取到换行符,继续丢弃数据
            discardedBytes += buffer.readableBytes();
            buffer.readerIndex(buffer.writerIndex());
            // We skip everything in the buffer, we need to set the offset to 0 again.
            offset = 0;
        }
        return null;
    }
}

逻辑也比较简单

  • 非丢弃模式下
    • 若读取到了换行符,那么读取出该行然后返回结果
    • 若没有读取到换行符,并且累加器内的字节数超过最大长度,那么丢弃这部分数据,进入丢弃模式
  • 丢弃模式下
    • 若读取到了换行符,丢弃直到换行符的数据,退出丢弃模式
    • 若没有读取到换行符,继续丢弃数据

LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder是基于长度域解码器

这个类有几个比较重要的成员变量

//长度域的起始偏移
private final int lengthFieldOffset;
//长度域的长度
private final int lengthFieldLength;
//长度矫正(有时候长度域存储的值是整个message的长度,需要设置该值得到真正数据段的长度)
private final int lengthAdjustment;
//跳过的起始字节数(有时候长度域前面有header部分,需要设置该值来跳过这部分找到长度域)
private final int initialBytesToStrip;
//长度域前的header长度加上长度域的长度
private final int lengthFieldEndOffset;

更详细的解释可以去查看官方文档

接下来看看他的decode方法,与之前一样,真正的解码逻辑在另一个decode方法里

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (discardingTooLongFrame) {
        //丢弃过长的数据
        discardingTooLongFrame(in);
    }

    if (in.readableBytes() < lengthFieldEndOffset) {
        //累加器中的可读字节数太少
        return null;
    }

    //计算长度域在累加器里的实际偏移数
    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    //获取无矫正前的真正数据段的长度(即没有将lengthAdjustment考虑进去)
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

    if (frameLength < 0) {
        //长度小于0,抛出异常
        failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
    }

    //将lengthAdjustment考虑进去,计算整个frame的长度
    frameLength += lengthAdjustment + lengthFieldEndOffset;

    if (frameLength < lengthFieldEndOffset) {
        //整个frame的长度<长度域前的header长度+长度域长度,抛出异常
        failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
    }

    if (frameLength > maxFrameLength) {
        //超出最大长度
        exceededFrameLength(in, frameLength);
        return null;
    }

    // never overflows because it's less than maxFrameLength
    int frameLengthInt = (int) frameLength;
    if (in.readableBytes() < frameLengthInt) {
        //累加器内的可读字节数小于frame长度
        return null;
    }

    if (initialBytesToStrip > frameLengthInt) {
        //跳过的字节数超过frame长度,抛出异常
        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
    }
    //跳过指定字节数
    in.skipBytes(initialBytesToStrip);

    // extract frame
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    //解析出frame数据包的数据
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    //设置累加器的读索引
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;
}

逻辑比较简单,概括下来就是这几步

  • 计算数据包长度的长度
  • 处理跳过的字节
  • 解析出数据包
  • 作者:fyygree
  • 原文链接:https://blog.csdn.net/fengyuyeguirenenen/article/details/124307927
    更新时间:2022-09-19 13:27:22