https://juejin.cn/post/6889632659979862029
ByteToMessageDecoder
看看ByteToMessageDecoder
这个解码器的channelRead
方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
//存放解析出来的结果
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
//累加字节流
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
//向后传播ByteBuf
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
主要过程分为3步
- 累加字节流
- 调用子类的decode方法进行解析
- 将解析到的ByteBuf向后传播
累加字节流
这部分代码片段如下
//累加字节流
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
通过一个累加器cumulator
累加字节流
首先会判断是否是第一次累加,来决定是否要分配空的bytebuf
cumulator
是Cumulator
类的实例,来看看它的cumulate
方法
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
?/省略
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
//读入的数据大于剩余容量,进行扩容
return expandCumulation(alloc, cumulation, in);
}
//读入数据
cumulation.writeBytes(in, in.readerIndex(), required);
//调整bytebuf的写索引
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
//省略
}
}
可以看到,就是通过一个bytebuf累加字节流,在适当的时候进行扩容
调用子类的decode方法进行解析
接下来看看callDecode
方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
//记录解码前out的大小
int outSize = out.size();
if (outSize > 0) {
//如果已经有解码结果,将其向后传播
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
//如果节点被移除了,退出循环
break;
}
outSize = 0;
}
//记录一下decode前的可读字节数
int oldInputLength = in.readableBytes();
//调用子类的decode方法
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
//如果节点被移除了,退出循环
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
//目前的字节数不足以解析出结果
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
//没有使用读入的字节,但是却解析出结果,抛出异常
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
//如果每次只解析一条message,退出循环
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
继续看看decodeRemovalReentryProtection
方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//调用子类的decode方法
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
//该节点正在等待被移除,向后传播结果然后移除该节点
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
向后传播ByteBuf
来看看finally块里的fireChannelRead
方法
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
可以看到,这里就是将解析出来的bytebuf一个个向后传播
FixedLengthFrameDecoder
FixedLengthFrameDecoder
是基于固定长度(字节的长度)的解码器,它集成自ByteToMessageDecoder
,在构造函数里传入frameLength
表示长度
来看看它的decode
方法
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
可以看到,解码的逻辑很简单
- 如果当前累加器里可读的字节数小于frameLength,返回null,out里不会添加解析结果
- 如果当前累加器里可读的字节数大于等于frameLength,那么读取frameLength个字节构建新的bytebuf,添加到out结果里
LineBasedFrameDecoder
LineBasedFrameDecoder
是基于行的解码器
先来看看他的几个比较重要的成员变量
//能够解码的最大长度
private final int maxLength;
//超出最大长度后是否要立即抛出异常,默认为false
private final boolean failFast;
//解析出来的结果是否丢弃换行符,默认为true
private final boolean stripDelimiter;
接下来看看它的decode方法
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
同样,它的核心方法在另一个decode方法里
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
//寻找行换行符,如果使用'\r\n'换行,那么位置指向\r
final int eol = findEndOfLine(buffer);
if (!discarding) {
//非丢弃模式
if (eol >= 0) {
//如果找到了行末
final ByteBuf frame;
//计算该行长度(不包括换行符长度)
final int length = eol - buffer.readerIndex();
//根据换行符长度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
if (length > maxLength) {
//行长度超过最大长度,读入数据并且抛出异常
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
if (stripDelimiter) {
//如果去掉换行符,
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
//返回解析到的结果
return frame;
} else {
//没找到换行符
final int length = buffer.readableBytes();
if (length > maxLength) {
//超出最大长度,将读入的数据都丢弃,进入丢弃模式
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
//丢弃模式下,读取到了换行符,退出丢弃模式
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
//丢弃模式下,没有读取到换行符,继续丢弃数据
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
逻辑也比较简单
- 非丢弃模式下
- 若读取到了换行符,那么读取出该行然后返回结果
- 若没有读取到换行符,并且累加器内的字节数超过最大长度,那么丢弃这部分数据,进入丢弃模式
- 丢弃模式下
- 若读取到了换行符,丢弃直到换行符的数据,退出丢弃模式
- 若没有读取到换行符,继续丢弃数据
LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder
是基于长度域解码器
这个类有几个比较重要的成员变量
//长度域的起始偏移
private final int lengthFieldOffset;
//长度域的长度
private final int lengthFieldLength;
//长度矫正(有时候长度域存储的值是整个message的长度,需要设置该值得到真正数据段的长度)
private final int lengthAdjustment;
//跳过的起始字节数(有时候长度域前面有header部分,需要设置该值来跳过这部分找到长度域)
private final int initialBytesToStrip;
//长度域前的header长度加上长度域的长度
private final int lengthFieldEndOffset;
更详细的解释可以去查看官方文档
接下来看看他的decode方法,与之前一样,真正的解码逻辑在另一个decode方法里
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
//丢弃过长的数据
discardingTooLongFrame(in);
}
if (in.readableBytes() < lengthFieldEndOffset) {
//累加器中的可读字节数太少
return null;
}
//计算长度域在累加器里的实际偏移数
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//获取无矫正前的真正数据段的长度(即没有将lengthAdjustment考虑进去)
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
//长度小于0,抛出异常
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
//将lengthAdjustment考虑进去,计算整个frame的长度
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
//整个frame的长度<长度域前的header长度+长度域长度,抛出异常
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
//超出最大长度
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
//累加器内的可读字节数小于frame长度
return null;
}
if (initialBytesToStrip > frameLengthInt) {
//跳过的字节数超过frame长度,抛出异常
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
//跳过指定字节数
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//解析出frame数据包的数据
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
//设置累加器的读索引
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
逻辑比较简单,概括下来就是这几步
- 计算数据包长度的长度
- 处理跳过的字节
- 解析出数据包