吃透Netty源码系列三十八之ByteToMessageDecoder详解

Harmony ·
更新时间:2024-11-14
· 938 次阅读

吃透Netty源码系列三十八之ByteToMessageDecoder详解ByteToMessageDecoder重要属性MERGE_CUMULATOR合并累加器COMPOSITE_CUMULATOR复合累加器抽象方法decodechannelRead读方法callDecode解码fireChannelRead传递消息列表中的消息decodeRemovalReentryProtection调用子类来解码FixedLengthFrameDecoder的decodechannelReadComplete读完成方法discardSomeReadBytes丢弃已读数据decodeLast最后解码 ByteToMessageDecoder

字节到消息的编码器,因此应该是实现入站操作的,这类解码器处理器都不是共享的,因为需要存还没有解码完的数据,还有各种状态,是独立的,不能进行共享。
在这里插入图片描述

重要属性

先看看他的一些属性:

//状态码 private static final byte STATE_INIT = 0;//初始状态 private static final byte STATE_CALLING_CHILD_DECODE = 1;//正在调用子类解码 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;//处理器待删除 ByteBuf cumulation;//累加缓冲区 private Cumulator cumulator = MERGE_CUMULATOR;//默认是合并累加器 private boolean singleDecode;//是否只解码一次 private boolean first;//是否是第一次累加缓冲区 private boolean firedChannelRead;//自动读取是false的时候,是否要去调用ChannelHandlerContext的read()来设置监听读事件,可能没读完 //状态 private byte decodeState = STATE_INIT; private int discardAfterReads = 16;//读取16个字节后丢弃已读的 private int numReads;//cumulation读取数据的次数

内部会维护一个状态decodeState ,以便于如果在执行解码的时候处理器上下文被删除了,可以及时响应。
还有一个累加缓冲区,如果有不能拼成一个消息的数据会放入这个缓冲区里,等待下一次继续拼。当然累加缓冲区怎么累加,就需要有累加器,默认是合并累加器MERGE_CUMULATOR

MERGE_CUMULATOR合并累加器

主要是做一般缓冲区的合并,直接将新的缓冲区拷贝到累加缓冲区中。

public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable() && in.isContiguous()) {//累计的不可读(比如为空缓冲区),且新的是连续的,不是符合缓冲区,释放老的,返回新的 cumulation.release(); return in; } try { final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) {//扩容了 return expandCumulation(alloc, cumulation, in); } cumulation.writeBytes(in, in.readerIndex(), required);//将in写入 in.readerIndex(in.writerIndex());//in不可读了 return cumulation; } finally { in.release();//返回前要释放in } } }; COMPOSITE_CUMULATOR复合累加器

另一个是复合累加器,也就是处理复合缓冲区,默认累加缓冲区也会是复合缓冲区。如果添加进来的缓冲区不可读,那就什么都不做,也就是复合缓冲区的累加方式。

public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable()) {//不可读了,直接返回in cumulation.release(); return in; } CompositeByteBuf composite = null; try { if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {//累计的是复合缓冲区且无其他引用 composite = (CompositeByteBuf) cumulation; if (composite.writerIndex() != composite.capacity()) {//更新容量到写索引处 composite.capacity(composite.writerIndex()); } } else {//如果不是复合缓冲区,就创建一个复合缓冲区把累计的添加进来 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation); } composite.addFlattenedComponents(true, in);//再添加in in = null; return composite; } finally { if (in != null) {//有异常,要释放缓冲区 in.release(); if (composite != null && composite != cumulation) {//有新的缓冲区申请的话也要释放 composite.release(); } } } } }; 抽象方法decode

其实有一个抽象方法需要子类实现,那就是具体的解码方法,参数in就是累加缓冲区,out可以理解为一个列表,存放解码后的对象。

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception; channelRead读方法

解码器也是一个处理器,只是在业务处理器前面做解码用,当然也是在读数据的地方做处理啦。CodecOutputList暂时不用管,就当一个列表,存放解码出来的消息就行。其实流程就是将新来的缓冲区
msg加到累加的缓冲区cumulation中,然后返回的又赋值给cumulation,这样就做到了合并了,然后去进行解码,解码的结果放入列表out 中。最后再进行资源的释放,往后传递消息和列表的回收。

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) {//只处理字节缓冲区类型的 CodecOutputList out = CodecOutputList.newInstance(); try { first = cumulation == null; cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);//累加 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) {//不为空也不可读,要释放 numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) {//读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled();//有被添加或者设置,表是有读过了 fireChannelRead(ctx, out, size);//尝试传递数据 out.recycle(); } } else { ctx.fireChannelRead(msg);//其他类型继续传递 } } callDecode解码

只要判断新的缓冲区in还有可读的,就进行解码,当然最开始消息列表out是空的,所以就进行子类来解码decodeRemovalReentryProtection,解码后看是否真正读取了缓冲区的内容,如果没读,说明不符合子类解码器的要求,就跳出循环了。如果能读取,就判断是否只解码一次,是就跳出,不是就继续读取来解码,解码好的消息会马上传递给后面,并把消息列表清空,当然这里不一定一次解码1个消息,也可能一次很多个。当然每次完成解码或者传递消息后要进行上下文是否被移除的检查,如果被移除了,就不能再进行处理了。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { try { while (in.isReadable()) {//有可读的 int outSize = out.size();//数量 if (outSize > 0) {//有消息解码出来就先传递了 fireChannelRead(ctx, out, outSize);//有解码好的数据就传递给后面 out.clear();//清空 if (ctx.isRemoved()) {//上下文被删除了就不处理了 break; } outSize = 0; } //继续解码 int oldInputLength = in.readableBytes();//还以后多少字节可读 decodeRemovalReentryProtection(ctx, in, out);//解码 if (ctx.isRemoved()) { break; } if (outSize == out.size()) {//没有生成新的消息,可能要求不够无法解码出一个消息 if (oldInputLength == in.readableBytes()) {//没有读取数据 break; } else { continue; } } if (oldInputLength == in.readableBytes()) {//解码器没有读数据 throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) {//是否每次只解码一条,就返回 break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } } fireChannelRead传递消息列表中的消息

这个方法是个用来传递消息列表中的所有消息的,判断消息列表是不是CodecOutputList类型,是的话就调用相应的获取方法getUnsafe来传递,这个获取消息的方法可能是不安全的,因为没做索引的越界检查,可能会越界。如果是一般的列表,就直接调用get方法获得。

static void fireChannelRead(ChannelHandlerContext ctx, List msgs, int numElements) { if (msgs instanceof CodecOutputList) {//如果是CodecOutputList类型的 fireChannelRead(ctx, (CodecOutputList) msgs, numElements); } else {//正常获取对象,传递下去 for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i));//传递每一个 } } } // 传递CodecOutputList中的每一个对象 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); } } Object getUnsafe(int index) { return array[index]; } decodeRemovalReentryProtection调用子类来解码

主要是调用子类实现的decode方法来解码,最后会考虑处理器是否被删除了,做一些处理。

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE;//设置为子类解码 try { decode(ctx, in, out);//调用子类解码 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;//是否待删除状态 decodeState = STATE_INIT;//处理完了设置为初始化 if (removePending) {//如果有被设置待删除状态,就马上处理 fireChannelRead(ctx, out, out.size());//把数据传出去 out.clear();//清空 handlerRemoved(ctx);//删除 } } } FixedLengthFrameDecoder的decode

举个例子,拿这个固定长的来看看他的解码方法,其实就是调用自定义的解码方法decode,然后把结果放进消息队列out中。具体的解码就是看可读数据是否大于等于固定长,如果是,就进行缓冲区的保留切片,切出固定长的缓冲区,这里为什么要保留切片呢,因为切片是共享原缓冲区的数据的,如果源缓冲区用完了可能被释放,所以需要保留一下,增加引用计数,当然在切片释放的时候,也会释放源缓冲区的。注意如果没达到解码器要求的,可能不会去读取缓冲区数据。

@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) {//如果可读字节的小于固定长度,什么都不做 return null; } else { return in.readRetainedSlice(frameLength);//返回的是切片,会增加in引用计数,防止被回收了 } } channelReadComplete读完成方法

当数据读取完成的时候,会尝试去丢弃discardSomeReadBytes累加缓冲区的已读信息,虽然可能要进行拷贝消耗点新能,但是放在那里浪费内存,所以就先丢弃了。之后判断是否有读取过缓存区的内容,如果没读到数据(可能没达到解码器要求,不读取数据),且没设置自动去读的,就手动设置一次监听读事件,可能后面还有部分没发过来,发过来了就可以解码拼成一个完整消息了。最后在传递读完成事件。

@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { numReads = 0; discardSomeReadBytes(); if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {//如果没有读到数据,且没有自动开启读,就设置读事件 ctx.read(); } firedChannelRead = false; ctx.fireChannelReadComplete(); } discardSomeReadBytes丢弃已读数据

如果缓冲区不为空,而且没有别的引用指向他,就丢弃已读的数据。

protected final void discardSomeReadBytes() { if (cumulation != null && !first && cumulation.refCnt() == 1) {//当引用值有1的时候丢弃,否则用户可能有其他用就不能直接丢弃 cumulation.discardSomeReadBytes(); } } decodeLast最后解码

在通道失效之前,会进行最后一次解码,以便于取出剩下的数据解码,当然如果没有数据,那等于什么都没做:

protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if (in.isReadable()) {//如果还能读的话把剩下的解码 decodeRemovalReentryProtection(ctx, in, out); } }

其他的一些方法就不多说了,理解了这些其他的都没太大难度了,后面再介绍下他的一些常用子类是怎么实现的。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


作者:王伟王胖胖



netty

需要 登录 后方可回复, 如果你还没有账号请 注册新账号