ThreadPool Exectorコア方法分析

12757 ワード

ThreadPool Exector類の核心はWorker内部類であり、資源を服用し、スレッド作成のオーバーヘッドを減らす。ワーカーはRunnableインターフェースを実現し、Abstractqueued Synchronizerを継承します。内部にワークスレッドが保存されています。スレッド池の状態:Runnable、shutdown、stop、tidyng、terminated。その核心方法:1、exector()
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();     //          
        if (workerCountOf(c) < corePoolSize) {   //          ,       
            if (addWorker(command, true))    //          ,   
                return;
            c = ctl.get();
        }
	        if (isRunning(c) && workQueue.offer(command)) { //  ,                   
	        int recheck = ctl.get();                           //            ,             
            if (! isRunning(recheck) && remove(command))
                reject(command);    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
上記の方法は、スレッドを作成するためにアドホックウォーカーを呼び出したスレッドを追加する実行ステップです。
//  core              
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);  //        

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&   //     》shutdown      task null
                ! (rs == SHUTDOWN &&  //(running  = CAPACITY ||  //                       
                //                               
                    wc >= (core ? corePoolSize : maximumPoolSize))//
                    return false;
                if (compareAndIncrementWorkerCount(c))  // CAS          
                    break retry;     // break retry     for  ,      ,           
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //   worker,      ,    
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock; //         
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);  //           
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;  //        
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  //  worker run    a
                    workerStarted = true;  
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
上のaに示すウォーカー・start()は、最終的にwrkerのrunメソッドを呼び出して、最終的に次の方法を呼び出します。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;   //      task
        w.firstTask = null;
        w.unlock(); // allow interrupts          
        boolean completedAbruptly = true;
        try {
        //  task          getTask()         
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || //       stop
                     (Thread.interrupted() &&   //       
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();  //    
                try {
                    beforeExecute(wt, task);  //    ,   
                    Throwable thrown = null;
                    try {
                        task.run();  //  
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  //w        ++
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);  //       ,      
        }
    }
上記の方法は主に二つの方法で起動することが重要です。getTaskはタスク・キューからタスクを取得し、processWorkerExit(w,completteed Abrapply)です。アイドルスレッドに対する処理です。
  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?      poll  

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();  //          shutdown          
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling? timed                           
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) //timeout true          ,       ,  null
                && (wc > 1 || workQueue.isEmpty())) {  
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //              ,                    
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();  //            
                if (r != null)
                    return r;  //     
                timedOut = true;  //   , timeout  。
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
prvate void processWorket Exitは、アイドルスレッドを処理するために使用されます。
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
       if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
           decrementWorkerCount();

       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           completedTaskCount += w.completedTasks;
           workers.remove(w);   // w worker  
       } finally {
           mainLock.unlock();
       }

       tryTerminate();    //       

       int c = ctl.get();   //       
       if (runStateLessThan(c, STOP)) {
           if (!completedAbruptly) {
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               if (min == 0 && ! workQueue.isEmpty())
                   min = 1;   //          0,        ,       
               if (workerCountOf(c) >= min)
                   return; // replacement not needed  //             ,   ,    
           }
           addWorker(null, false);   //  ,      
       }
   }
shutdown()の後、スレッドプールはshutdown状態になります。この時は新しいタスクを受け付けませんが、実行中のものと、ブロック列の中で処理待ちのものを処理します。
shutdownNow()の後、スレッド池はstop状態になります。この時は新しいタスクを受け付けず、待ち行列の中で待つジョブを処理せず、処理中のスレッドを中断する試みもあります。その後、shutdownを見てみます。
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();   //     
        try {
            checkShutdownAccess();  //         shutdown  
            advanceRunState(SHUTDOWN);  //         shutdown,             
            interruptIdleWorkers();  //       
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();   //       
    }
コールできます
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
interrupt IdleWorkers()はまずメーンロックを取得します。なぜならウォーカーsetを反復するためには、各workersを中断する前に二つの判断をする必要があります。
1、スレッドが中断されているかどうかは、何もしないことです。
2、ウォーカー.tryLock()が成功したかどうか
二つ目の判断が重要です。ワーカー類は実行可能なRunnableを実現する以外に、AQSを継承しています。自身もロックです。
tryLock()は、Workカー自身が実現するtryAcquire()方法を呼び出しています。これはAQS規定のサブクラスで実現するための鍵の取得を試みる方法です。ワーカーのロックは再入力できないため、runworkerではtaskを実行してもワーカーをロック操作します。このコードの中では、空きスレッドだけがt.interrupt(操作)を実行します。キャプチャ中断異常後、getTask()の最初の判定スレッドのプール状態にループし続ける論理は、スレッドのプールがshutdown状態であり、workQue.isEmptyの場合、return nullに進み、ワーカースレッドの終了ロジックが実行される。tryTerminate()実行フロー:
1、スレッド池が終了プロセスに入る必要があるかどうかを判断する(shutdown状態+workQue.isemptyまたはstop状態になる場合のみ、必要です)
2、スレッド池にスレッドがあるかどうかを判断すると、interrupt IdleWorkers(ONLY_)があります。ONE)アイドルスレッドを中断しようとしました。(このロジックこそ、再度中断信号を送ることができます。タスクを取得しているスレッドを中断します。)
3、ステータスがSHUTDOWNなら、work Queも空きました。実行中のウォーカーもなくなりました。terminatedを開始します。まずロックをかけて、スレッド池をtidyng状態にします。その後、サブクラスで実現するterminated()を呼び出して、最後にスレッド池をterminated状態にします。そして、すべての待ちスレッド池がこのCoditionのスレッドを終了し、shotdwnnowはそのままスレッド池の状態をstopにして、すべてのスレッドに対して中断信号を送ります。
まとめ:1、threadpool Exector内部にはAtomicInteger変数があります。スレッドの数と状態を統計します。その状態はruning、shutdown、stop、tidyng、terminatedです。小到着2、threadpool Exectorの内部には、スレッドを作成するための内部クラスのワーカーがあります。その実行ステップは、スレッド数がコアスレッド数より小さい場合は、スレッドを作成し、そうでなければタスクをタスクキューに提出し、タスクのキューがいっぱいになったら、スレッド数が最大スレッド数より小さいかどうかを確認し、小さい場合はスレッドを作成する。異常がないように投げます。3、ウォーカーのrunworker方法(新規スレッドとなります)は、ユーザーが定めたタスクを先に実行し、実行が完了したら、ジョブキューにジョブを取りに行って実行を続けます。タスクのgettask()メソッドでは、スレッドの数がコアスレッド数より小さいかどうかを判定し、それより大きい場合はタスクキュー(ブロック)を所定時間のpoll操作し、ない場合はnullに戻ります。コアスレッド数に等しい場合、ずっとブロックされます。nullに戻ったら、アイドルスレッドの廃棄作業を行います。4、アイドルスレッドの廃棄作業はスレッドをスレッドセットから削除し、スレッドプールの廃棄を試みている(この方法ではアイドルスレッドに対して中断操作を行い、スレッドを廃棄する)。また、スレッド数がコアスレッド数より小さいと発見された場合は、空スレッドも作成されます。5、shutdown()方法はスレッドプールの廃棄に用いる。スレッドの状態をshutdown状態にして、タスクの提出を停止し、アイドルスレッドを中断してスレッドを廃棄します。タスクのキューと実行中のタスクについては、続行します。アイドルスレッドを破壊する際は、ロックを取得します。(この時点で作業中のスレッドのロックが解除されていないと、取得できません。)その後、切断方法を呼び出します。クエストをブロックしているコアスレッドに対しては、tryTerminate()では卓一が中断します。6、shutdownnow()はスレッド池の状態をstopとし、すべてのスレッドに対してinterrupt()操作を行い、実行していないジョブについてはリストとして返します。