【Netty】新しい接続へのアクセス方法


公衆番号へようこそ:【
プログラミングが好きです
バックグラウンドの返信が必要な場合
2019贈呈
1 Tの学习资料よ!!
前文に続き、本は前回【NioEventLoop】に続きます.NioEventLoop実行プロセスを検討する際、IOイベント(新しい接続を含む)を検出し、IOイベントを処理し、すべてのタスクの3つのプロセスを実行します.ここで、IOイベントのうち、保有するselectorによってイベントがポーリングされ、新しい接続が検出される.ここでは同じコードを多重化します.
チャンネルのデザイン
分析を始める前に、まずChannelのデザインを知っておきましょう.
最上位チャネルインタフェースは、読み取り、書き込み、接続、バインドなどのsocketイベントを定義し、AbstractChannelをスケルトンとして使用してこれらの方法を実装します.ビューアメンバー変数、ほとんどの共通のコンポーネントが定義されていることがわかります.
第2層AbstractNioChannelは、NIO、すなわちSelectorによる読み書きイベントの傍受を定義する.メンバー変数にはselector関連のプロパティが保存されます.
第3層の内容は比較的多く,サービス側channel(左にAbstractNioMessageChannelを継承したNioServerSocketChannel)およびクライアントchannel(右にAbstractNioByteChannelを継承したNioSocketChannel)が定義されている.
新しい接続にアクセスする方法
Nettyが新しい接続にどのようにアクセスするかを探り始めました.主に四つの部分に分かれている
1.新しい接続の検出
2.NioSocketChannelの作成
3.スレッドの割り当てとSelectorの登録
4.Selectorへの読み取りイベントの登録
1.新しい接続の検出
Nettyサービス側は、起動時にbossGroup、すなわちNioEventLoopをバインドし、bind()でポートをバインドするときにaccept(新しい接続アクセス)イベントを登録します.イベントをスキャンして処理します.そのため、入り口はNioEventLoop#processSelectedKeys()から始まります.
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //    
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        //    NioEventLoop workGroup     OP_READ,bossGroup OP_ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

            //              
            unsafe.read();
        }
      }

重要な新しい接続アクセスおよび読み取りイベント処理エントリunsafe.read();a).ここでのunsafeは、Channel作成プロセスの際に親AbstractChannel#AbstractChannel()の構造方法を呼び出し、pipelineとともに初期化される.
  protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

サービス側:unsafeはNioServerSockeChanne lの親AbstractNioMessageChannel#newUnsafe()で作成され、対応するAbstractNioMessageChannelの内部クラスNioMessageUnsafeが表示されます.
クライアント:unsafeがNioSocketChannelの親AbstractNioUnsafe#newUnsafe()を作成すると、AbstractNioByteChannelの内部クラスNioByteUnsafeに対応します
b).unsafe.read() NioMessageUnsafe.read()の主な操作は以下の通りです.
1.jdkの最下位のコードをループしてchannelを作成し、nettyのNioSocketChannelでパッケージし、新しい接続がチャネルに正常にアクセスしたことを示します.
2.取得したすべてのチャネルを1つのコンテナに格納し、アクセスの接続数を検出し、デフォルトでは1回に16の接続を接続する
3.コンテナ内のチャネルを巡回し、メソッドfireChannelReadを順次呼び出し、4.FireChannelReadComplete,fireExceptionCaughtは、対応する伝播イベントをトリガする.
private final class NioMessageUnsafe extends AbstractNioUnsafe {
        //         
        private final List readBuf = new ArrayList();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();

            //          
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    //while    doReadMessages()       
                    do {
                        //  jdk   channel,   readBuf  
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //           totalMessages,        16   ,    
                        allocHandle.incMessagesRead(localRead);
                        
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                
                //  readBuf          :ChannelRead    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //    
                readBuf.clear();
                allocHandle.readComplete();
                //      :ChannelReadComplete,        
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);
                    //      :exceptionCaught,    
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

このキーコード論理の int localRead = doReadMessages(readBuf);は、jdk最下位のchannelを作成し、NioSocketChannelでパッケージし、受信したコンテナに追加して保存し、カウントを返します.
protected int doReadMessages(List buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
  // jdk   channel   netty channel,           
                //this    channel
                buf.add(new NioSocketChannel(this, ch));
 //                ,   
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

2.NioSocketChannelの作成
IOイベントを検出して新しい接続をポーリングすると、接続アクセスイベントが正常に検出された後、NioServerSocketChannel#doReadMessages()メソッドが呼び出され、NioSocketChannel、すなわちクライアントchannelを作成するプロセスが行われる.
NioSocketChannelの主な仕事について説明します.元のコードを表示するには、親構造メソッドを呼び出し、NioSocketChannelConfigをインスタンス化します.
public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        //     NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket());
    }

1)、NioSocketChannelの親の構築方法を表示します.主に、クライアントが登録した読み取りイベントを保存し、channelをメンバー変数とし、ブロックモードを非ブロックに設定します.
 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        //     NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        //         :   channel    
        super(parent, ch, SelectionKey.OP_READ);
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        //     channel     
        this.ch = ch;
        //              
        this.readInterestOp = readInterestOp;
        try {
            //          
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

最後に親クラスを呼び出す構造方法は、クライアントchannelに対応するサービス側channelと、channelのidと2つのコンポーネントunsafeとpipelineを設定することである.
 protected AbstractChannel(Channel parent) {
        //parent       channel    channel(               )
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

2)、NioSocketChannelConfigのインスタンス化を参照してください.主にjavaSocketが保存する、setTcpNoDelay(true);によってtcpのNagleアルゴリズムが禁止されているが、できるだけ小さなパケットを大きな送信に統合し、遅延を低減することを目的としている.
 private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
            super(channel, javaSocket);
            calculateMaxBytesPerGatheringWrite();
        }

    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        //  socket
        this.javaSocket = javaSocket;

        // Enable TCP_NODELAY by default if possible.
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                //  Nagle  ,                         
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.
            }
        }
    }

3.スレッドの割り当てとSelectorの登録
サービス側が初期化を開始するとServerBootstrap#init()で、主にいくつかのパラメータの構成が行われています.ここで、childGroup,childOptions,childAttrs,childHandler等のパラメータについては個別に構成されている.パラメータとしてServerBootstrapAcceptorとともに特殊なhandleとしてpipelineにカプセル化される.ServerBootstrapAcceptorのうちeventLoopworkGroupである.
public class ServerBootstrap extends AbstractBootstrap {
  //       .............
    @Override
    void init(Channel channel) throws Exception {

        //  AbstractBootstrap.option
        final Map, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        //  AbstractBootstrap.attr
        final Map, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey key = (AttributeKey) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //  pipeline
        ChannelPipeline p = channel.pipeline();

        //  ServerBootstrapAcceptor    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;

        final Entry, Object>[] currentChildOptions;
        final Entry, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                //  AbstractBootstrap.handler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //  ServerBootstrapAcceptor,  Handle  HeadContext
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

//       .............
}

サービス側pipeline全体の構造を下図に示す.bossGroupはIOイベントの検出と処理を制御し、bossGroup全体に対応するpipelineはヘッダ(HeadContext)テール(TailContext)と中部のServerBootstrap.ServerBootstrapAcceptorのみを含む.
新しい接続アクセス時にAbstractNioMessageChannel.NioMessageUnsafe#read()メソッドが呼び出され、最終的にfireChannelRead()が呼び出され、次のHandlerのchannelReadメソッドがトリガーされる.このHandlerはServerBootstrapAcceptorです
サーバBootstrapの内部クラスであり、ChannelInboundHandlerAdapterから継承されています.ChannelInboundHandlerですその中でchannelReadは主に以下のことをしました.
1.クライアントチャネルのpipelineにchildHandlerを追加する
2.クライアントTCP関連属性childOptionsとカスタム属性childAttrsの設定
3.workGroup NioEventLoopを選択してSelectorを登録する
1)、クライアントchannelのpipelineにchildHandlerを追加する
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry, Object>[] childOptions;
        private final Entry, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;

        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry, Object>[] childOptions, Entry, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

       //       。。。。。 
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // channel          channel
            final Channel child = (Channel) msg;

            //  childHandler
            child.pipeline().addLast(childHandler);

            //  TCP    :childOptions
            setChannelOptions(child, childOptions, logger);

            //       :childAttrs
            for (Entry, Object> e: childAttrs) {
                child.attr((AttributeKey) e.getKey()).set(e.getValue());
            }

            try {
                //  NioEventLoop   Selector
                childGroup.register(child)
                        .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
      //       。。。。。
    }

クライアントchannelのpipelineはchildHandlerを追加し、サービス側EchoServer作成プロセスにおいてchildHandlerの場合、ChannelInitializerのカスタムインスタンスを使用した.initChannelの方法をカバーし、方法を変更してpipelineを取得し、具体的なHandlerを追加した.ChannelInitializerの追加ロジック、handlerAddedメソッドを参照してください.実はinitChannelロジックでは、まずユーザコード実行initChannelにコールバックし、ユーザコードはHandlerを追加する追加操作を実行し、その後ChannelInitializer自身をpipelineから削除する.
public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {

 @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.

            //   Channel
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //       
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //    
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

}

2)、クライアントTCP関連属性childOptionsおよびカスタム属性childAttrsを設定することは、ServerBootstrapAcceptor#init()の方法ですでに実現されている
3)、workGroupはNioEventLoopを選択し、Selectorを登録するAbstractBootstrap#initAndRegister()メソッドから開始し、ソースコードを追跡するとAbstractUnsafe#register()メソッドに到達する
 protected abstract class AbstractUnsafe implements Unsafe {
      //       。。。。。
  @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
      //       。。。。。
}

最後に、AbstractNioUnsafe#doRegister()メソッドが呼び出され、jdkのjavaChannel().registerによって登録機能が完了する.

    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
      //       。。。。。
  @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
      //       。。。。。
}

4.Selectorへの読み取りイベントの登録
a)、入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register();
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry, Object> e: childAttrs) {
                child.attr((AttributeKey) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

b)、実際にAbstractChannel.AbstractUnsafe#register0()が呼び出され、チャネルアクティブ化イベントがトリガーされる.
  //        ,  HeadContent 
   pipeline.fireChannelActive();

c)、pipelineの頭部から始まり、すなわちDefaultChannelPipeline.HeadContext#channelActive()によってreadIfIsAutoRead()がトリガーされる.
 @Override
  public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            readIfIsAutoRead();
  }

d)、読み出しイベントは、末尾のTailContent#read()からトリガーされ、ctxが順次実行する.read()は,末尾から各outboundHandlerのread()イベントがトリガーされる.頭まで.
  @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }


    @Override
    public ChannelHandlerContext read() {
        //     outboundhandler
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();

        //      read  
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Tasks tasks = next.invokeTasks;
            if (tasks == null) {
                next.invokeTasks = tasks = new Tasks(next);
            }
            executor.execute(tasks.invokeReadTask);
        }

        return this;
    }

e)、ヘッダHeadContext#read()に入り、最終的にselectionKeyを変更し、selectorに読み取りイベントを登録する
HeadContext#read()
       @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

AbstractChannel#beginRead()
  @Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

AbstractNioMessageChannel#doBeginRead
  @Override
    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }
        super.doBeginRead();
    }

AbstractNioChannel#doBeginRead()
  @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

参考記事:Jorgezhong
まとめ
Nettyが新しい接続の基本プロセスにどのようにアクセスするかは上述したように、間違いがあれば、ご指摘ください.まず前の2編から見たほうが理解しやすいことをお勧めします.
【Netty】サービス側とクライアント学習NioEventLoop
最後に
Java、ビッグデータに興味があればQRコードを長く押して注目してください.私はあなたたちに価値をもたらすように努力します.あなたに少しでも役に立つと思う人は、「いいね」や「転送」を手伝ってください.公式アカウント「愛コード」に注目し、2019に返信するには関連資料がありますよ.