Dubboソース分析------dispatcherとThreadPool

14018 ワード

Dispatcher
Dispatcherは、イベントがどのように配布されるかを決定するポリシーであり、どのイベントがスレッドプールに配布されるか、または現在のスレッドで直接実行されるかを決定します.
まずインタフェースの定義を見てみましょう
@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) //            
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

Dispatcherのいくつかの実装は次のとおりです.
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

拡張メカニズムにより、URLのパラメータ(diapatcher/dispather/channel.handler)に基づいて対応するインプリメンテーションが取得され、設定されていない場合、デフォルトではSPI上のインプリメンテーション、すなわちallのインプリメンテーションが使用される
AllDispatcher
public class AllDispatcher implements Dispatcher {
   
    public static final String NAME = "all";
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

public class AllChannelHandler extends WrappedChannelHandler {
    
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
        }catch (Throwable t)  {//ERROR}
    }
    
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        }catch (Throwable t)  {//ERROR}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t)  {//ERROR}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        }catch (Throwable t) {//ERROR}
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) { 
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

この場合のイベントはすべてスレッドプールで処理されることがわかります.親WrappedChannelHandlerの初期化を直接呼び出す構造方法を参照してください.
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        //             
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

ExecutionDispatcher
/**
 *             
 * 
 * @author chao.liuc
 */
public class ExecutionDispatcher implements Dispatcher {
    
    public static final String NAME = "execution";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}

public class ExecutionChannelHandler extends WrappedChannelHandler {
    
    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
    }
}

Allと似ています(公式サイトでは、メッセージのみをスレッドプールに送信するように要求し、応答、応答、その他の接続切断イベント、ドキドキなどのメッセージを含まず、IOスレッド上で直接実行します.そうではありませんね)
DirectDispatcher
/**
 *       。
 * 
 * @author chao.liuc
 */
public class DirectDispatcher implements Dispatcher {
    
    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

すべてのメッセージはスレッドプールに送信されず、すべてIOスレッド上で直接実行されます.ここでは、入力されたhandlerを返すだけで、スレッドプール処理には移行しません.Handlerでは、装飾者モードを使用しています.他のDispatcherはHandlerを1層包装します.この層はスレッドプールに配布されています.包装しなければ、元のプロセスに戻ります.
MessageOnlyDispatcher
/**
 *   message receive     .
 * 
 * @author chao.liuc
 */
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }

}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
    
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

リクエスト応答メッセージのみがスレッドプールに送信され、他の接続はイベント、心拍数などのメッセージを切断し、IOスレッド上で直接実行されます.
ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit ;
    
    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                                     new NamedThreadFactory(threadName, true),
                                     new AbortPolicyWithReport(threadName, url)
            );  // FIXME       connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
       //....
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
       //....
    }

    public void disconnected(Channel channel) throws RemotingException {
        //....
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        //....
    }

    public void received(Channel channel, Object message) throws RemotingException {
                //....
        cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.RECEIVED, exception));
        //....
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        //....
        cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        //....
    }
    
    private void checkQueueLength(){
        if (connectionExecutor.getQueue().size() > queuewarninglimit){
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
        }
    }
}

このタイプでは、イベント実行スレッドプールに加えて、スレッドプールクラス内にスレッドが1つしかないスレッドプールを初期化し、接続が切断され、接続されたイベントをスレッドプール処理に配置します.
Dispatcherの内容は比較的簡単で、大体過ぎました.また、普段はallを使用しています.このようにすべてのイベントをスレッドプールに格納して処理すると、リクエストが来たときにスレッドプールがいっぱいになったと仮定してエラーを報告しますが、allはエラーイベントもスレッドプールに戻して返します.このときスレッドプールがいっぱいになったら、このエラー情報は送信できません.コンsumerはタイムアウトを待機します
ThreadPool
Dispatcherでスレッドプールを初期化するコードは次のとおりです.
ExecutorService executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                       .getAdaptiveExtension().getExecutor(url);

拡張メカニズムで対応するタイプのThreadPoolを取得し、インタフェース定義を見てみましょう.
@SPI("fixed")
public interface ThreadPool {
    
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

拡張メカニズムによれば、デフォルトで使用されているfixedという実装が知られており、@Adaptiveにthreadpoolというkeyが宣言されており、threadpoolという属性を構成してどの実装を使用するかを指定できることを証明している.以下、Dubbo内部の3つのスレッドプール実装を参照する.
/**
 *                   ,      ,   :Executors.newFixedThreadPool()
 * 
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 * @author william.liangf
 */
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue() : 
                    (queues < 0 ? new LinkedBlockingQueue() 
                            : new LinkedBlockingQueue(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
/**
 *        ,          ,         ,   :Executors.newCachedThreadPool()
 * 
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 * @author william.liangf
 */
public class CachedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue() : 
                    (queues < 0 ? new LinkedBlockingQueue() 
                            : new LinkedBlockingQueue(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
/**
 *         ,    ,      。
 * 
 * @author kimi
 */
public class LimitedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue() : 
                    (queues < 0 ? new LinkedBlockingQueue() 
                            : new LinkedBlockingQueue(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
  • FixedThreadPool:スレッドプールのデフォルトコアスレッド数と最大スレッド数は200で、threadsプロパティで構成できます.キューのデフォルトは0です.キューの数が0の場合、SynchronousQueueオブジェクトが0未満の場合は、無境界キューLinkedBlockingQueueを使用します.そうでない場合は、境界のあるLinkedBlockingQueue
  • を使用します.
  • CachedThreadPool:コアスレッド数を構成するためにFixedThreadPoolよりも多くのcorethreadsのプロパティを設定し、aliveプロパティ構成keepAliveTimeパラメータ、その他の
  • に類似する
    3.LimitedThreadPool:このタイプのkeepAliveTimeはLong.MAX_VALUE、つまりスレッド数をほぼ自動的に減らすことはありません