Javaスレッドプールのコアパラメータをソースコードで理解

16003 ワード

背景
Java開発者として、スレッドプールには決して見慣れていない.普段の仕事でも、面接でも、スレッドプールは必ずできる知識点だ.また、その表面を知ることができず、理解が不十分で、実戦でOOMが現れやすく、面接でうつ伏せになる可能性もある.
パラメータの意味
実は、スレッドプールを研究したことがあるならば、実は難しくなくて、彼のパラメータは多くなくて、java.util.concurrent.ThreadPoolExecutorの中のパラメータはこれらを列挙します.
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:スレッドプール内のコアスレッド数は、タスク実行がない場合でも存在する.(パラメータの構成を考慮しない:allowCoreThreadTimeOut,allowCoreThreadTimeOutは、コアスレッドのタイムアウトを許可するかどうかは文字通り分かりますが、一般的には設定する必要はありません.本稿では考慮しません)
  • maximumPoolSize:スレッドプールに存在する許容最大スレッド数.
  • keepAliveTime:スレッドプール内のスレッドがコアスレッド数を超えると、この余分な空きスレッドは新しいタスクの実行のタイムアウト時間を待つ.例えば、コアスレッド数が1、最大スレッド数が5、現在の実行スレッドが4、keepAliveTimeが60 sであると、4-1=3個のスレッドがアイドル状態で60 s待機後も新たなタスクが来ないまま破棄される.
  • unit:keepAliveTimeの時間単位.
  • workQueue:スレッドキュー、現在の時間コアスレッドが実行する、また新しいタスクが来た場合、この新しいタスクはこのスレッドキューに格納され、実行を待つ.
  • threadFactory:スレッドプールはスレッドのファクトリクラスを作成する.
  • handler:スレッドキューが同僚の実行スレッド数でいっぱいになってもmaximumPoolSizeに達した場合、この時点で新しいスレッドが来ると、どのhandlerがこのスレッドを処理するかが実行される.handlerのデフォルトのタイプは次のとおりです.
  • AbortPolicy:RejectedExecutionException異常
  • を投げ出す
  • DiscardPolicy:何もしない.
  • DiscardOldestPolicy:スレッドキューの中で最も古いタスクを捨てて、1つの空間を変えて現在のタスクを実行する.
  • CallerRunsPolicy:現在のスレッド(例えばmain)を用いてこのスレッドを実行する.


  • プロセスの実行
    私たちはパラメータの意味を知っています.では、これらのパラメータは実行中にどのように実行されているのか、まず文字でいくつかの状況に分けて説明します.
    話す前に、まず例を見てみましょう.
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new BasicThreadFactory.Builder().namingPattern("name-%d").build());
    threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   System.out.println("test");
               }
           });
    

    長い間、私は疑問や誤解を持っていました.スレッドプールなのに、どうして毎回 ( )が必要なのでしょうか.私达がスレッドを学ぶことを始める时まずnew Thread()を学んで、それからまたnew Runnable()を学んで、ゆっくりとこの2つを1つの坛に混ぜて、実はnew Runnable()は新しいスレッドがなくて、ただ1つの运行可能な任务を新筑して、1つの普通の対象で、ははは、これは1つのとても愚かな间违った认识であるべきです.上記の具体的な意味に戻ります.
  • 新しい実行タスクが追加すると、現在実行されているスレッドはcorePoolSize未満となり、この新しいタスクを実行するためにオンラインスレッドプールに新しいスレッドが作成される.
  • 新たに実行するタスクを追加すると、現在実行されているスレッドがcorePoolSize以上になり、このとき、この新しいタスクをスレッドキューworkQueueに追加する必要があり、スレッド内のスレッド実行が1つのタスクを完了すると、すぐにキューから1つのタスクを実行する.
  • は2を迎えて、もし行列もいっぱいになったら、どうしますか?maximumPoolSizecorePoolSizeより大きいと、この新しいタスクを処理するためにスレッドが新規に作成され、総実行スレッド数がmaximumPoolSizeに達することが分かる.
  • 総実行スレッド数がmaximumPoolSizeに達したら、新しいタスクが来たらどうしますか?以上のような拒否ポリシーが実行する必要があるhandlerである、構成されたポリシーに従って処理が行われ、デフォルトでは構成されていない場合、AbortPolicyが使用する.
  •  private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    

    ソース検証
    上記の流れが正しいとどう判断しますか?私达はソースコードにフォローして上の流れをよく见ることができて、実はスレッドプールの実行するコードは比较的に简単で、変动を见て、ソースコードを见て、掌握するのはもっと深いはずです.
    まずexecute()の方法を見てみましょう
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // ctl         ,                 ;
            int c = ctl.get();
            // 1.               
            if (workerCountOf(c) < corePoolSize) {
                //     worker(  )  .
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 2.      addWorker    ,          
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //    ,    ;
                if (! isRunning(recheck) && remove(command))
                    //         ,    ,      
                    reject(command);
                //           0
                else if (workerCountOf(recheck) == 0)
                    //                   
                    addWorker(null, false);
            }
            // 3.        ,         
            else if (!addWorker(command, false))
                // 4.         ,       
                reject(command);
        }
    

    通常のプロセスでは、理想的な環境しか考えられません.上の4つのステップに分けて、ちょうど上の文字の説明に対応することができます.
    考えるのが好きな学生は、2歩目、列に入った後、いつこの新しい加入を実行したのか、定時的な任務があるのだろうか.いいえ、そうではありません.このaddWorker()の方法を見てみましょう.
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            //     
            for (;;) {
                int c = ctl.get();
                //          ;
                int rs = runStateOf(c);
                ...
                //     
                for (;;) {
                    //            
                    int wc = workerCountOf(c);
                    //             ,      false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //     ,            1
                    if (compareAndIncrementWorkerCount(c))
                        //       
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    // 1   ,        ,         ,    ;
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //            Runnable
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    ...
                    //   hashSet             
                    workers.add(w);
                    workerAdded = true;
                    if (workerAdded) {
                       //    worker,worker            ,         Runnable
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    上記のaddWorker()の方法では、t.start()によってスレッドが起動する.Workerというクラスはjava.util.concurrent.ThreadPoolExecutor.Workerに存在する、定義は以下のように比較的重要なコードのみを保持する.
     private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable{
                 Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
            
            final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                       ....
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        ...
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
            }
    

    したがって、t.start()の場合、実際には、runWorker(this);の方法が実行するスレッドが新規に作成され、この方法にはwhileのループがあり、getTask()はキューからタスクを取得する.したがって、ここでは、キューに格納されたタスクがいつ実行されたかを解くことができ、いずれかのコアスレッドが空き出すと、キュー内のタスク実行をループして取得する.各コアスレッドと新しく作成されたスレッドは、あなたが転送したRunnableを実行するために同期されたrunの方法です.
    プロセス全体がはっきりしているはずです.
    以上述べたように、コアパラメータの差は多くありませんが、keepAliveTimeというパラメータはソースコードでどのように使われていますか?getTask()の方法でキューからタスクを取り、この方法のコードを見てみましょう(非主要なものを省略します).
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                ...
                 int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    主にタスクを取るためのものであるここでは、poll()はブロックされず、take()はブロックされているので、pollを使用してデータを取ると設定のタイムアウトに達して、引き続き実行する、設定時間を超えてもタスクが入ってこない場合は、timedOutをtrueとしてnullに戻る.このtimedOutは、上のifの判断を制御し、最終的にcompareAndDecrementWorkerCount()を制御する方法は、実行するスレッド数を1つ減らすことであり、次回また満杯になると新規作成するので、このAliveは失効する.
    まとめ
    全体的に、ソースコードから見ると、問題は権威的にいくつかの問題を解くことができる.时にはソースコードもそんなに深くないようですか?