Dubboソース分析のスレッド池

58347 ワード

記事の目次
  • クライテリア
  • Dubboスレッド池分類
  • Dubboスレッド工場
  • スレッド池拒否ポリシー
  • CachedThreadPool
  • FixedThed Pool
  • LimitedThreadPool
  • Eager ThreadPool
  • 前言nettyを学んだことがあると信じていますが、ChannelHandlerの各々はそのEventLoop(I/O )を通じてそのイベントを処理していますので、このスレッドをブロックしないことが大切です。これは全体のI/Oに負の影響を与えます。Dubboもそうであるので、Dubboは、イベントを非同期的に処理するためのいくつかのスレッドプールを定義している。
    Dubboスレッド池の分類Dubbo-2.6.5バージョンには三つのスレッドがあります。
  • CachedThreadPool
  • FixedThreadPool
  • LimitedThreadPool
  • GitHub上のDubboの最新のソースコードの中でまた新しいスレッドプールを提供しました。
  • EagerThreadPool
  • よく知っていますか?EagerThreadPoolを除いて、他のスレッドプールはjava.util.concurrent.Executorsを提供してくれたようです。ただ、Dubboは自分でもう一度実現しました。カスタムパラメータを調整してください。(スレッドの池に詳しくないのはこの記事です。JAVAスレッドの池ThreadPool Exector詳細は分かりません。)
    Dubboスレッド工場
    スレッド池を見る前に、まずDubboカスタムスレッド池工場を見てください。
    public class NamedThreadFactory implements ThreadFactory {
    
    	//         
        protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
    	//            
        protected final AtomicInteger mThreadNum = new AtomicInteger(1);
    	//        
        protected final String mPrefix;
    	//          
        protected final boolean mDaemon;
    	//      
        protected final ThreadGroup mGroup;
    	//          :    
        public NamedThreadFactory() {
            this("pool-" + POOL_SEQ.getAndIncrement(), false);
        }
    	
        public NamedThreadFactory(String prefix) {
            this(prefix, false);
        }
    
        public NamedThreadFactory(String prefix, boolean daemon) {
            mPrefix = prefix + "-thread-";
            mDaemon = daemon;
            SecurityManager s = System.getSecurityManager();
            mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
        }
    
        @Override
        public Thread newThread(Runnable runnable) {
            String name = mPrefix + mThreadNum.getAndIncrement();
            Thread ret = new Thread(mGroup, runnable, name, 0);
            ret.setDaemon(mDaemon);
            return ret;
        }
    
        public ThreadGroup getThreadGroup() {
            return mGroup;
        }
    }
    
    
    このスレッド工場は私達のスレッドプールの名前を作成し、スレッドの名前を作成しました。私達がスタック情報を確認するのに便利です。スレッドは保護/ユーザーの決定権を使用者が決定します。上のスレッド工場はdebugバージョンのソースコードです。実はGitHub上のDubboの最新のソースコードの中にスレッド工場のためにもう一つの層が封入されています。
    public class NamedInternalThreadFactory extends NamedThreadFactory {
    
        public NamedInternalThreadFactory() {
            super();
        }
    
        public NamedInternalThreadFactory(String prefix) {
            super(prefix, false);
        }
    
        public NamedInternalThreadFactory(String prefix, boolean daemon) {
            super(prefix, daemon);
        }
    
        @Override
        public Thread newThread(Runnable runnable) {
            String name = mPrefix + mThreadNum.getAndIncrement();
            InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
            ret.setDaemon(mDaemon);
            return ret;
        }
    }
    
    
    何もしていないようですが、詳細はDubbo-2.6.5方法にあります。父newThreadに直接使用されているNamedThreadFactoryです。ここで使っているのはnew Threadです。何の違いがありますか?よく見てください。new InternalThreadということは、もう一回包装しました。何もしていないようです。この機能はここでは見られません。InternalThreadのソースコードを見たことがあります。ThreadNetty Nettyがあります。FastThreadLocalはここでも同じです。FastThreadLocalThreadのソースコードを参考にしました。DubboNettyの違いは何ですか?私のこの文章のJavaソースのThreadLocalに移してください。ここでは余計な話はしません。
    スレッドが拒否されたポリシー
    Dubboもスレッドプール拒否策を書き直しました。主な操作はオンラインプログラムが拒否された時にスタック情報をファイルに保存します。具体的には私が分析したこの文章を見ることができます。Dubboのソースコード分析のためのスレッドプールの拒否戦略はここでは詳しく紹介しません。
    次にスレッド池を見ます。
    CachedThreadPool
    public class CachedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
        	//           :Dubbo 
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            //        :0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            //         :Integer.MAX_VALUE
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            //         :0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            //         :60s
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
            		queues == 0 ? new SynchronousQueue<Runnable>() : 
            			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
            					: new LinkedBlockingQueue<Runnable>(queues)),
            		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }
    
    FastThreadLocalとは、スレッドが伸縮し、イベントが多いときに新しいスレッドが作成され、イベントが少ない場合には、一定時間を超えてスレッドが回収されます。ThreadLocal:実は、このCachedThreadPoolはまともな ではない。おそらく、このスレッドプールのブロック列はデフォルトCachedThreadPoolであるが、ユーザーがcache変数を構成し、その値が大きいと、使用されるブロック列はSynchronousQueueである。このとき、queuesがデフォルト値LinkedBlockingQueueを再使用すると、イベントのブロックが発生する。
    Fixed ThreadPool
    public class FixedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
            		queues == 0 ? new SynchronousQueue<Runnable>() : 
            			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
            					: new LinkedBlockingQueue<Runnable>(queues)),
            		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    
    corethreadsはコアスレッドと最大スレッドサイズが固定値であることを示しており、0においてもこのように処理されているが、ここで注意すべき点がある。FixedThreadPoolDubboスレッドプールを使用すると、デフォルトのスレッドプール数は であり、デフォルトのブロック列サイズはfixedであり、デフォルトで使用されているブロック列は200である。あなたのビジネスイベントが合併した場合、または処理時間が長い場合、ブロック列サイズ、すなわち0変数を適切に調整してください。そうでないと大量の要求が破棄されます。このスレッド池もSynchronousQueueがデフォルトで使っているスレッド池で、事故が多いと推定されています。
    LimitedThreadPool
    public class LimitedThreadPool implements ThreadPool {
    
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
            		queues == 0 ? new SynchronousQueue<Runnable>() : 
            			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
            					: new LinkedBlockingQueue<Runnable>(queues)),
            		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    
    
    このスレッドの池は面白いです。スレッドの池のスレッド数は上限まで成長してもいいです。いつまでも回収しないというのは疑似概念です。コードを見ると、スレッド設定スレッドの回収時間はqueuesに制限されています。永遠に回収しないと理解できます。これは注意すべきことと言えば、スレッド池は回収されていないので、Dubbo変数であるスレッド最大制限の値は大きすぎてはいけません。標準Long.MAX_VALUEを使用すればいいです。threadsを避けることができます。
    Eager ThreadPool
    注:このスレッド池はdubbo-2.6.5以下では実現されていません。
    まず、このスレッドのプールを紹介します。スレッドの池にあるすべてのコアスレッドが忙しくなっている時に、新しいタスクを追加すると、ブロック列に入れずに新しいスレッドを作成します。
    public class EagerThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
    
            // init queue and executor
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    }
    
    大体の規則は上と同じです。中には見たことがない種類が二つあります。200OOMがあります。
    まず、カスタムブロックTaskQueueを見てください。
    public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
    
        private static final long serialVersionUID = -2635853580887179627L;
    
        private EagerThreadPoolExecutor executor;
    
        public TaskQueue(int capacity) {
            super(capacity);
        }
    
        public void setExecutor(EagerThreadPoolExecutor exec) {
            executor = exec;
        }
    	//    offer   
        @Override
        public boolean offer(Runnable runnable) {
            if (executor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
    
            int currentPoolThreadSize = executor.getPoolSize();
            //               offer
            if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
                return super.offer(runnable);
            }
    
            //                           
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
    
            // currentPoolThreadSize >= max
            return super.offer(runnable);
        }
    
       	//    offer   
        public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if (executor.isShutdown()) {
                throw new RejectedExecutionException("Executor is shutdown!");
            }
            return super.offer(o, timeout, unit);
        }
    }
    
    
    最も主要なロジックは、書き換えられたEagerThreadPoolExecutor方法の中で、この方法を書き換えることによって、スレッド池に任務を提出する時に、ブロック列がいっぱいになったと偽って、スレッド池を新規にスレッド化してタスクを実行することができます。
    TaskQueue類のofferコードを貼り付けます。
     public void execute(Runnable command) {
             /*        null         */
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            /*                     ,    addWorker       */
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            /*                   ,               ,            */
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            /*          ,                */
            //tip:                    ,      addWorker   ,          
            else if (!addWorker(command, false))
                reject(command);
        }
    
    詳しいことを知りたいのですが、このブログのスレッド池のソースコード分析はどうやってコアスレッドが破壊されないことを保証しますか?ThreadPoolExecutorのソースコードを通して、ブロック列に失敗したときは、直接executeメソッドを呼び出して、新しいスレッドを作成してタスクを実行します。書き換えたThreadPoolExecutoraddWorkerと方法が一致しました。TaskQueue類を引き続き見ます。
    //   ThreadPoolExecutor
    public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
    
       	//     
        private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
    
        public EagerThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit, TaskQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        //            
        public int getSubmittedTaskCount() {
            return submittedTaskCount.get();
        }
    
    	// ThreadPoolExecutor                 
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            submittedTaskCount.decrementAndGet();
        }
    
    	//    execute   
        @Override
        public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            submittedTaskCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                //                 
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    }
    
    
    書き換えスレッド池も簡単で、offerに使用される現在のジョブ数のインターフェースを提供します。EagerThreadPoolExecutor方法もよく分かります。なぜTaskQueue方法でexecuteの異常を捕捉したのかという小仲間もいるかもしれない。これはまた、executeソースに戻ります。
    //      
     else if (!addWorker(command, false))
                reject(command);
    //      
    
    このコードは、RejectedExecutionExceptionが失敗したときにThreadPoolExecutorを投げます。書き換えられていないスレッドのプールでこの異常な条件は、ブロック列がいっぱいになり、スレッド数が最大限度に達したことです。書き換えられたaddWorkerは、待ち行列の中で渋滞しています。RejectedExecutionExceptionはまた、オンラインプログラムのスレッド数が最大制限を下回る場合はTaskQueueに戻ります。offerに戻っても、新しいブロック列が満杯とは言えませんので、falseが必要です。
    //      
       if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
       }
    //      
    
    スレッド池の分析が終わりました。提出記録を見ました。falseは当時の私達の会社の先輩が書いたもののようです。retryOfferEagerThreadPoolです。すごいですね。みんなで一生懸命勉強します。coding