NettyでのServer Bootstrap初期化ソース解析

23452 ワード

前言
以前の記事ではNettyのスレッドプールNioEventLoopGroupの初期化プロセスを紹介していましたが、今回はNettyで同様に非常に重要なものであるServerBootstrapを分析します.
ServerBootstrap使用フラグメント
        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Acceptor NIO Thread#%d").build();
        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Reactor NIO Thread#%d").build();

        this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory);
        this.workerGroup = new NioEventLoopGroup(numberOfThreads, workerThreadFactory);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(DeviceServerListener.this.timeoutSeconds));
                pipeline.addLast("lineBasedFrameDecoder-" + maxLength, new LineBasedFrameDecoder(Integer.parseInt(maxLength)));//   ('
') ByteBuf pipeline.addLast("stringPluginMessageDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("stringToByteEncoder", new StringToByteEncoder());// JSON ByteBuf pipeline.addLast("deviceMessageDecoder", new DeviceMessageDecoder());// JSON deviceMessage pipeline.addLast("deviceMessageEncoder", new DeviceMessageEncoder());// deviceMessage JSON pipeline.addLast("deviceHeartBeatResponseHandler", new DeviceHeartBeatResponseHandler(heartTime)); pipeline.addLast("deviceAuthResponseHandler", new DeviceAuthResponseHandler(DeviceServerListener.this.timeoutSeconds, DeviceServerListener.serverInstanceName)); pipeline.addLast("deviceMessageHandler", new DeviceMessageHandler()); // log.debug("Added Handler to Pipeline: {}", pipeline.names()); } }).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT); // Start the server. Bind and start to accept incoming connections. this.channelFuture = bootstrap.bind(serverPort).sync();

これはNettyサービス側の比較標準の初期化フラグメントであり、ServerBootstrapが非常に重要なシーンを持っていることがわかります.これはNettyのイニシエータのように、NioEventLoopGroupが以前に分析されていたので、ServerBootstrapの初期化を直接見てみましょう.
ソース解析
以上のフラグメントから、初期化時にServerBootstrapの非パラメトリック構造関数によってオブジェクトが作成され、この構造関数には何の操作もないので分析しないことがわかります.次に、このオブジェクトのチェーン呼び出しbootstrap.group().channel().childHandler().option()の列を見てみましょう.まずはgroup()メソッド
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

ここで、入力されたchildGroupオブジェクトは、ServerBootstrapchildGroup属性に割り当てられ、親AbstractBootstrapgroup()メソッドが呼び出される
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }

ここから、この方法は、入力されたparentGroupgroup属性に付与することが分かる.
次にchannel()の方法を見てみましょう
    public B channel(Class extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new BootstrapChannelFactory(channelClass));
    }

ここでchannelFactory(new BootstrapChannelFactory(channelClass))を呼び出して、ついてきます.
    public B channelFactory(ChannelFactory extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return (B) this;
    }

ここでは、AbstractBootstrapクラスのchannelFactory属性の付与が完了し、付与対象はnew BootstrapChannelFactory(channelClass)である.
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable

ここで、Cは、AbstractBootstrapのクラスに従ってChannelのサブクラスとして定義されるので、実際に入力されるchannelClassの値はNioServerSocketChannel.classである.
次にchildHandler()の方法を見てみましょう
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

操作が多すぎず、ServerBootstrapchildHandler属性にのみ値を付与しているが、ここに伝わるchildHandlerは少し複雑である.受信されたオブジェクトはnew ChannelInitializer()であり、受信および送信の双方向メッセージを処理するための処理チェーンを初期化するinitChannel(SocketChannel ch)の方法が同時に上書きされている.この方法は比較的重要であり、その後、単独で分析され、ここでは詳細に説明しない.
最後にoption()の方法を見てみましょう
    public  B option(ChannelOption option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

ここでは、options配列を維持し、設定する項目を配列に配置します.定義は次のとおりです.
private final Map, Object> options = new LinkedHashMap, Object>();

ここで,オブジェクトを入れるkey値はChannelOption>オブジェクトに限定されていることが分かるが,おおよそ次のようなものがある.
    public static final ChannelOption ALLOCATOR = valueOf("ALLOCATOR");//  PooledByteBufAllocator UnpooledByteBufAllocator         (4.1       )
    public static final ChannelOption RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");//     
    public static final ChannelOption MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");

    public static final ChannelOption CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
    public static final ChannelOption MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
    public static final ChannelOption WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
    public static final ChannelOption WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
    public static final ChannelOption WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");

    public static final ChannelOption ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
    public static final ChannelOption AUTO_READ = valueOf("AUTO_READ");

    public static final ChannelOption SO_BROADCAST = valueOf("SO_BROADCAST");//         
    public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE");//          
    public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF");//       
    public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF");//       
    public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR");//               
    public static final ChannelOption SO_LINGER = valueOf("SO_LINGER");//    close()   ,        
    public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG");//                     

    public static final ChannelOption SO_TIMEOUT = valueOf("SO_TIMEOUT");

    public static final ChannelOption IP_TOS = valueOf("IP_TOS");
    public static final ChannelOption IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
    public static final ChannelOption IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
    public static final ChannelOption IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
    public static final ChannelOption IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");

    public static final ChannelOption TCP_NODELAY = valueOf("TCP_NODELAY");//    Nagle  ,                  


ここでは、tcpのパラメータ設定の大部分があり、一般的に使用されているいくつかのパラメータがソースコードに注釈されています.パフォーマンスに大きな影響を及ぼすのはALLOCATORで、一部の資料によると、オープンオブジェクトプールの性能はオープンしないよりはるかに高く、Nettyは4.1バージョンからデフォルトのオプションをオープンオブジェクトプールに設定しています.ここでは、valueOfの方法の実装について説明します.
    public static  ChannelOption valueOf(String name) {
        checkNotNull(name, "name");
        ChannelOption option = names.get(name);
        if (option == null) {
            option = new ChannelOption(name);
            ChannelOption old = names.putIfAbsent(name, option);
            if (old != null) {
                option = old;
            }
        }
        return option;
    }

この方法は主にnamesという配列を維持しており,以下のように定義されていることがわかる.
private static final ConcurrentMap names = PlatformDependent.newConcurrentHashMap();
PlatformDependent.newConcurrentHashMap()の具体的な実装を見てみましょう
    public static  ConcurrentMap newConcurrentHashMap() {
        if (CAN_USE_CHM_V8) {
            return new ConcurrentHashMapV8();
        } else {
            return new ConcurrentHashMap();
        }
    }

ここを見て私はまたショックを受けました.Netty自身はjava 8以下のバージョンに対応するためにConcurrentHashMapV8を実現しました.これは6000行以上のコードのクラスであることを知っています.のNettyの開発者は本当に狂魔の称号を最適化したと言わざるを得ない.bootstrap.group().channel().childHandler().option()という一連のチェーン呼び出しを解析し、次に次の一連のbootstrap.bind(serverPort).sync()を見てみましょう.ここではまずbind()の方法を見てみましょう
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

ここでは、ポート番号に基づいてInetSocketAddressオブジェクトを作成し、bind(SocketAddress localAddress)メソッドを呼び出します.
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

ここで、validate()メソッドは、AbstractBootstrapクラスのgroupおよびchannelFactory属性が空であるかどうかを検証し、doBind()メソッドを実行する.
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//   Channel       
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            //          
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            //          ,          ,        
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        //            
                        promise.setFailure(cause);
                    } else {
                        //        
                        promise.executor = channel.eventLoop();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

まずregFutureオブジェクトがinitAndRegister()メソッドで生成された方法を見てみましょう.
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
             //     Channel
            channel = channelFactory().newChannel();
            //       Handler
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // Channel         ,      GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //  channel   Reactor   
        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
channelFactory().newChannel()ここで呼び出されるのはAbstractBootstrapchannelFactory属性のnewChannel()メソッドであり、ここで最終的に得られるのは、以前に付与されたchannelFactoryオブジェクトの場合に基づいてNioServerSocketChannelのインスタンスオブジェクトである.channelオブジェクトが得られた後、init(channel)が呼び出されて初期化され、この方法はサブクラスServerBootstrapによって実現される.
    @Override
    void init(Channel channel) throws Exception {
        final Map, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);// options      DefaultChannelConfig   
        }

        final Map, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey key = (AttributeKey) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry, Object>[] currentChildOptions;
        final Entry, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {//   ServerBootstrapAcceptor         ,        mainReactor   Channel   subReactor
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

この方法は長いように見えますが、主に2つのことをしました.1つは、親と子のoptionsとattrsを割り当てることです.二、channelpipeline属性の処理チェーンを構築する.ここでは主にaddLast()の方法を見てみましょう.DefaultChannelPipelineによって具体的に実現されています.見てみましょう.
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);//  handler          sharable 

            newCtx = newContext(group, filterName(name, handler), handler);//filterName(name, handler)    name,newContext   newCtx   handler、inbound、outbound、name、pipeline       

            addLast0(newCtx);// newCtx  tail    

            if (!registered) {//  channel   eventloop   registered  true,       
                newCtx.setAddPending();//CAS   newCtx handlerState  
                callHandlerCallbackLater(newCtx, true);//  newCtx pendingHandlerCallbackHead next  
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

最終的には、3つのパラメータのaddLast()メソッドが実行され、送信されたgroupおよびnameはいずれも空である.この方法では主にchanneleventloopに登録されているかどうかに応じて異なる処理を行い、handlerチェーンを初期化する.initAndRegister()メソッドに戻り、init(channel)を解析した後、channelをReactorスレッドプールに登録するもう一つの重要なregister()メソッドを見てみましょう.この方法はNioEventLoopGroupの親MultithreadEventLoopGroupで実現される
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

ここでのchooserの前の記事では、スレッドプールに設定されたスレッド数が2のべき乗次数で異なる実装があるかどうかを分析したが、next()メソッドは配列のNioEventLoopオブジェクトを返し、このクラスのregister()メソッドは、最終的にはchannelが登録されたDefaultChannelPromiseオブジェクトを返し、具体的にはここでは深くない.これでinitAndRegister()メソッドの解析が完了し、doBind()メソッドに戻ってdoBind0()メソッドの解析を継続します.
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

ここでは、バインド操作regFuture.isSuccess()が実行されるのは、channel、すなわちchannel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)の登録が成功した場合のみであり、そうでなければpromiseに直接登録失敗を書き込むことはできない.ここでのpromiseDefaultChannelPromiseのタイプのオブジェクトであり、このクラスはFutureから継承され、特殊なFutureのオブジェクトと考えられる.bind(localAddress, promise)メソッドは最終的にtailノードにバインドされ、最終的にDefaultChannelPipelineの内部クラスのunsafeにバインドされ、呼び出しチェーンは非常に深く、ここでは展開されません.最後にジャンプしてbootstrap.bind(serverPort).sync()sync()を見てみると、bootstrap.bind(serverPort)promiseを返したと分析されていたので、sync()法はDefaultChannelPromiseによって実現された.
    @Override
    public ChannelPromise sync() throws InterruptedException {
        super.sync();
        return this;
    }

続いてDefaultPromiseクラスについて
    @Override
    public Promise sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

通常、この方法はwhileサイクル中に常に存在し、sync()メソッドに実行されるスレッドがブロックされるため、bootstrap.bind(serverPort).sync()以降のコードは達成できないことが分かる点に注意すべきである.
まとめServerBootstrapの使用方法は非常に固定されており、ほとんどの通常の使用では初期化コードテンプレートが適用されます.主なことは、ServerBootstrapの各属性を付与し、Channel、バインドユーザ定義Handler、およびChannelを1つのeventloopに登録することであり(ここで特に強調するのは、Channelは1つのeventloopにのみバインドされる)、最後にローカルポートをバインドしてIOイベントをリスニングすることである.