Nettyソース分析-TCPの接着パケットの分解問題をどのように解決するか

9878 ワード

実際のネットワークアプリケーションでは、私たちが受信および送信したデータは、実際のアプリケーションのデータ型単位(例えば、Httpデータ体、またはThriftObject)である.一方、Socketは、TCPトランスポート層のデータを処理し、受信または送信されたTCPパケットのうち、1つのThriftObject、または複数のThriftObject、ThriftObjectの一部、さらには複数のThriftObjectの複数の部分から構成される可能性がある.これがTCPのボンド半包の問題です.
Nettyは、ChannelHandlerを埋め込むことによって実現されるTCP半パケットおよび粘着パケットの問題を容易に処理するメカニズムを提供する.
次に、TCPの接着および分解の問題をNettyでどのように処理するかを簡単なコード例で見てみましょう.コードにFixedLengthFrameDecoderのタイプのChannelHandlerが追加されていることがわかり、このコードを追加する効果は、毎回1024の長さのバイトデータを下層ChannelHandlerのinput処理対象として切り取ることである.
    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //  TCP      
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
                    }
                })
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new SimpleNettyServerHandler());
                    }
                });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

1.ソース分析FixedLengthFrameDecoderを例にとると,その実装は非常に簡単で,クラスByteToMessageDecoderから継承され,親クラスの抽象的な方法decode(ChannelHandlerContext ctx, ByteBuf in, List out)を実装した.したがって,NettyによるTCPボンドパケットの分解処理を解析するには,ByteToMessageDecoderが核心論理である.
public class FixedLengthFrameDecoder extends ByteToMessageDecoder{
 @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out)
    {.....}
}

1.1プロセスの概要
4つのコアアプローチ:B y t e ToMessageDecoder channelRead(..) B y t e ToMessageDecoder callDecode(..) B y t e ToMessageDecoder fireChannelRead(..) FixedLengthFrameDecoder decode(..)method
Class
説明
void channelRead( ChannelHandlerContext ctx, Object msg)
ByteToMessageDecoder
エントランスメソッドでは、インパラメータmsgはByteBufになります.主なプロセスは、新しいByteBuf cumulationに組み立てられ、このcumulationをcallDecodeすることです.
void callDecode( ChannelHandlerContext ctx, ByteBuf in, List out)
ByteToMessageDecoder
ByteBufを復号化し、JavaObjectのセットをoutに解析すると、実際のdecodeメソッドが呼び出され、fireChannelReadメソッドがChannelHandlerの転送に呼び出されます.
void fireChannelRead( ChannelHandlerContext ctx, List msgs, int numElements)
ByteToMessageDecoder
この方法は解析生成されたJavaObjectに対して下層ChannelHandlerの伝達を行う.
decode( ChannelHandlerContext ctx, ByteBuf in, List out)
FixedLengthFrameDecoder
実際にdecodeを行う方法は、ByteBufを一定の長さでObjectのセットに分割します.
1.2 channelReadメソッド解析
記事Nettyソース分析-ChannelPipelineの分析により、channelReadメソッドの呼び出しはChannelPipelineであり、パラメータmsgはByteBufである.
ステップ1:まず、復号化されるべき生成されたオブジェクトを格納するために、空のCodecOutputListがインスタンス化される.第2歩:核心はcumulationを付与することにあり、cumulationのタイプもByteBufで、それはパラメータのmsgと何が違いますか?cumulationがnullの場合、msgのアドレスはcumulationに直接割り当てられます.そうしないと、cumulator.cumulate(ctx.alloc(), cumulation, data)メソッドがcumulationに値を返します.メソッドの説明「Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.」を見てください.msgとcumulationをmergeしていました.これは、2つのTCPパケットのデータを1つのデータに接続することに相当する.ステップ3:cumulationを復号するためにcallDecodeを呼び出す.
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

1.3 callDecodeメソッド解析
callDecodeのコアプロセスは、ByteBufを遍歴し、遍歴中にdecodeメソッドを呼び出してObjectオブジェクトを解析し、解析したオブジェクトに対してfireChannelReadメソッドを実行し、Pipelineの伝達を保証することです.コードコメントでは、プロシージャが分析されます.
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                if (outSize > 0) {
                    //         ,       Pipeline     
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                int oldInputLength = in.readableBytes();
                //      decode         
                decodeRemovalReentryProtection(ctx, in, out);
                if (ctx.isRemoved()) {
                    break;
                }
                //           ,     ByteBuf    ,    ByteBuf       ,   break。
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                //      ,       ByteBuf,     
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

1.4 fireChannelReadメソッド解析fireChannelRead(ChannelHandlerContext ctx, List msgs, int numElements)メソッドのパラメータは復号化後に生成されたリストであり、これらのObjectを巡回し、それぞれctx.fireChannelRead(final Object msg)を呼び出してChanelPipelineの転送を行う.
下にはctx.fireChannelRead(final Object msg)のコード実装もリストされており、findContextInbound()はctxの次のAbstractChannelHandlerContextを見つけ、ChannelPipelineを後で渡す.
static void fireChannelRead(ChannelHandlerContext ctx, List msgs, int numElements) {
        if (msgs instanceof CodecOutputList) {
            fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
        } else {
            for (int i = 0; i < numElements; i++) {
                ctx.fireChannelRead(msgs.get(i));
            }
        }
    }
abstract class AbstractChannelHandlerContext{
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
}

1.5 decode方法分析
次はクラスFixedLengthFrameDecoderのコード実装です.現在のByteBufの長さが1つのFrameの長さに足りないと判断し、処理しないとFrameが復号されてList outに追加され、ByteBufポインタがframeLengthの長さに前方に移動するのが簡単であることがわかります.
    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readRetainedSlice(frameLength);
        }
    }

2.その他の実装
上の例では,TCPボンドパケットの分解を解決する一例として,固定長に分割されたFrameについて述べた.実際のアプリケーションでは、HttpアプリケーションではHttpRequestを復号する必要があり、thrift RPC呼び出しではThriftObjectを復号する必要があるなど、アプリケーション層のビジネスエンティティタイプに応じて異なるdecode復号が行われる.
実際のNettyは、よく使われるHttpRequestDecoderのように、Httpオブジェクトを復号するのに役立ち、XmlDecoderはXMLオブジェクトを復号するのに役立ち、jsonオブジェクト復号、WebSocketオブジェクト復号などもあります.
Nettyがすでに提供しているDecoderがあなたの要求を満たすことができなければ、あなたも自分のDecoderを実現することができます.プロセスは非常に簡単で、ByteToMessageDecoderクラスを継承し、抽象的な方法decode(ChannelHandlerContext ctx, ByteBuf in, List out)を実現するだけである.