JettyはどのようにNIO分析を実現するか(三)


必要な知識:
    1.IOモデル:IOとオペレーティングシステムの関係(一)JAVAのいくつかのIOの動作メカニズムと特徴(二)を参照する
    2.Jettyコンテナ:JETTY基本アーキテクチャ参照
 
1.jettyモジュール分析
詳細は公式サイトを参照してください.http://wiki.eclipse.org/Jetty/Reference/Dependencies(jettyモジュール依存)
1.1 jetty依存ツリー:
Jetty如何实现NIO分析(三)_第1张图片
 
This diagram shows the compile dependencies for the Jetty project. The external dependencies are listed on the right hand side and all other modules shown are part of the project. 
 
1.2 jettyコアモジュール(httpクライアントサービス側通信モジュール)
(公式解釈)The jetty-util,jetty-io and jetty-http jars form the core of the jetty HTTP handler(generation and parsing)that is used for both the jetty-client and the jetty-server.
次の図を示します.
私たちが普段最も関心を持っているのはclientがどのようにserverで通信し、io通信をどのように実現するかです.そのため、以下のモジュールは理解する必要があります.
jetty-client:
jetty-server:
jetty-http:
jetty-io:
jetty-util:
Jetty如何实现NIO分析(三)_第2张图片
 
1.3 jettyモジュール構造分析
注意:jettyコアコードクラスはjetty 8のselectChannelConnectorに使用されます.ここではjetty 8 APIを参照してください.jetty 9はサーバConnectorに変更されました.
jetty8 api:http://download.eclipse.org/jetty/8.1.17.v20150415/apidocs/ 
 
1.3.1依存jettyプラグインを追加し、searchから.maven.orgダウンロード:
     
<!-- jetty -->
<dependency>
      <groupId>org.eclipse.jetty.aggregate</groupId>
      <artifactId>jetty-all-server</artifactId>
      <version>8.1.18.v20150929</version>
</dependency>
依存コンポーネントは次のとおりです.
  Jetty如何实现NIO分析(三)_第3张图片
  
 
1.3.2 Jetty Connectorの実装クラス図:
 
       Jetty如何实现NIO分析(三)_第4张图片
次のようになります.
SelectChannelConnectorは各コンポーネントの組み立てを担当します
SelectSetはクライアント要求をリスニングする
SelectChannelEndPointはIOの読み書きを担当する
HttpConnectionは論理処理を担当する
サービス側全体で要求を処理するプロセスは、次のように3つの段階に分けられます.
フェーズ1:接続の傍受と確立
Jetty如何实现NIO分析(三)_第5张图片
このプロセスは主にacceptの新しい接続を担当するスレッドを起動し、傍受して対応するSelectSetに割り当て、割り当てられたポリシーはポーリングです.
SelectChannelConnectorコアメソッド:
 1.クライアント要求dispatch()を受信するSelectorManagerの作成
    
   private final SelectorManager _manager = new ConnectorSelectorManager();
    /*
     * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());

        super.doStart();// 1.1   open    server    1.2            
    }

/* --------------------------1.1       port---------------------------------- */
    public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();
                // Set to blocking mode
                _acceptChannel.configureBlocking(true);

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                _localPort=_acceptChannel.socket().getLocalPort();
                if (_localPort<=0)
                    throw new IOException("Server channel not bound");

                addBean(_acceptChannel);
            }
        }
    }

/* ----------------------1.2                ,   1 ---------------------- */
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel server;
        synchronized(this)
        {
            server = _acceptChannel;
        }

        if (server!=null && server.isOpen() && _manager.isStarted())
        {
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Socket socket = channel.socket();
            configure(socket);
            _manager.register(channel); //SelectorManager    channel
        }
    }
    /** Register a channel   channel selectSet
     * @param channel
     */
    public void register(SocketChannel channel)
    {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++;
        if (s<0)
            s=-s;
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup();
        }
    }

 
 
フェーズ2:クライアントのリクエストをリスニングする
Jetty如何实现NIO分析(三)_第6张图片
 
このプロセスは主に複数のスレッド(スレッド数は一般的にサーバCPUの個数、dubboのnio配置cpuコア数+1)を起動し、selectSetが管轄するchannelキューを傍受し、各selectSetに1つのselectorを維持させ、このselectorはキュー内のすべてのchannelを傍受し、リードイベントがあり、スレッドプールからスレッドを処理要求とする.
selectorManager        dispatch()

/* ------------------------------------------------------------ */
    /* (non-Javadoc)
     * @see org.eclipse.component.AbstractLifeCycle#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _selectSet = new SelectSet[_selectSets];
        for (int i=0;i<_selectSet.length;i++)
            _selectSet[i]= new SelectSet(i);

        super.doStart();

        // start a thread to Select
        for (int i=0;i<getSelectSets();i++)
        {
            final int id=i;
            boolean selecting=dispatch(new Runnable()
            {
                public void run()
                {
                    String name=Thread.currentThread().getName();
                    int priority=Thread.currentThread().getPriority();
                    try
                    {
                        SelectSet[] sets=_selectSet;
                        if (sets==null)
                            return;
                        SelectSet set=sets[id];

                        Thread.currentThread().setName(name+" Selector"+id);// selector  
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
                        LOG.debug("Starting {} on {}",Thread.currentThread(),this);
                        while (isRunning())
                        {
                            try
                            {
                                set.doSelect();//Select and dispatch tasks
                            }
                            catch(IOException e)
                            {
                                LOG.ignore(e);
                            }
                            catch(Exception e)
                            {
                                LOG.warn(e);
                            }
                        }
                    }
                    finally
                    {
                        LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
                        Thread.currentThread().setName(name);
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(priority);
                    }
                }

            });

            if (!selecting)
                throw new IllegalStateException("!Selecting");
        }
    }

/* ------------------------SelectChannelEndPoint     ----------------------------- */
    public void dispatch()
    {
        synchronized(this)
        {
            if (_state<=STATE_UNDISPATCHED)
            {
                if (_onIdle)
                    _state = STATE_NEEDS_DISPATCH;
                else
                {
                    _state = STATE_DISPATCHED;
                    boolean dispatched = _manager.dispatch(_handler);
                    if(!dispatched)
                    {
                        _state = STATE_NEEDS_DISPATCH;
                        LOG.warn("Dispatched Failed! "+this+" to "+_manager);
                        updateKey();
                    }
                }
            }
        }
    }

 
フェーズ3:要求の処理
Jetty如何实现NIO分析(三)_第7张图片
 
このプロセスは、クライアントが要求するたびにデータ処理プロセスであり、バックエンドのビジネス処理がSelectorが新しい要求をリスニングすることを阻害しないように、マルチスレッドでリスニング要求と処理要求の2つの段階を分離することに注目すべきである.
 
/* ------------------------------------------------------------ */
    /*
     */
    protected void handle()
    {
        boolean dispatched=true;
        try
        {
            while(dispatched)
            {
                try
                {
                    while(true)
                    {
                        final AsyncConnection next = (AsyncConnection)_connection.handle();
                        if (next!=_connection)
                        {
                            LOG.debug("{} replaced {}",next,_connection);
                            Connection old=_connection;
                            _connection=next;
                            _manager.endPointUpgraded(this,old);
                            continue;
                        }
                        break;
                    }
                }
                catch (ClosedChannelException e)
                {
                    LOG.ignore(e);
                }
                catch (EofException e)
                {
                    LOG.debug("EOF", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (IOException e)
                {
                    LOG.warn(e.toString());
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (Throwable e)
                {
                    LOG.warn("handle failed", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                finally
                {
                    if (!_ishut && isInputShutdown() && isOpen())
                    {
                        _ishut=true;
                        try
                        {
                            _connection.onInputShutdown();
                        }
                        catch(Throwable x)
                        {
                            LOG.warn("onInputShutdown failed", x);
                            try{close();}
                            catch(IOException e2){LOG.ignore(e2);}
                        }
                        finally
                        {
                            updateKey();
                        }
                    }
                    dispatched=!undispatch();
                }
            }
        }
        finally
        {
            if (dispatched)
            {
                dispatched=!undispatch();
                while (dispatched)
                {
                    LOG.warn("SCEP.run() finally DISPATCHED");
                    dispatched=!undispatch();
                }
            }
        }
    }

これによりJettyのNIO使用に関するパターンを大まかにまとめることができ、下図のように:
Jetty如何实现NIO分析(三)_第8张图片
1).接続の傍受と確立
2).クライアント要求のリスニング
 
3).要求の処理
最も核心は3つの異なることを隔離し,異なる規模のスレッドで処理し,NIOの非同期と通知特性を最大限に利用することである. 
 
 
1.4 jettyサービスを開始し、コンテナスレッドプールを観察する
Jetty如何实现NIO分析(三)_第9张图片
 
 
 
 
 
 
参考ブログ:
1.Jetty、Tomcat、MinaからNIOフレームワークネットワークサーバのクラシックモードを抽出する(一)2.Jetty、Tomcat、MinaからNIOフレームワークネットワークサーバのクラシックモードを抽出する(二)3.Jetty、Tomcat、MinaからNIOフレームワークネットワークサーバのクラシックモードを抽出する(3)4.JAVA NIOシリーズチュートリアル5.JettyのSelectChannelConnector分析