JavaスレッドプールThreadPoolExecutor(jdk 1.8ベース)(二)

12093 ワード

前回はThreadPoolExecuterのexecute法の具体的な実行過程を分析したが,この主な分析の中のいくつかの重要な関数である.http://blog.csdn.net/youxitongyongming/article/details/77751874
  • private boolean addWorker(Runnable firstTask,boolean core)はまずパラメータを分析し、firstTaskは私たちのユーザーが入力した実行する必要があると考えているcoreパラメータであり、真の場合、現在のスレッド数がcorePoolSizeより小さいことを示し、偽はスレッド数がcorePoolSizeより大きいことを意味する.この方法は主に2つの大きな論理があり,第1の部分はCAS操作によって新しい現在のスレッド数:
  • に従う.
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
    
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    2つの大きなfor無限ループ体は,外側のループが現在のスレッドプールの状態を絶えず検出し,Running状態にないことが発見されるとすぐにfalseに戻る.前の論文の解析から,スレッドプールがshutdownメソッドを呼び出すと,心のタスクを受信しなくなることが分かった.さらに内層のループを見ると,主にCAS操作によりスレッドプール数を更新し,if(c o m p areAndIncrementWorkerCount(c))が更新に成功すると外層ループから飛び出し,後の論理を継続する.更新に失敗した場合は、現在のスレッドプールの状態を再確認する必要があります.CAS操作は同期操作が行われていないため、スレッドプールの状態が変わった可能性があり、再検出が必要です.状態が変化していることが判明した場合,内層サイクルから飛び出して直接次の外層サイクルを行い,スレッドプールの状態を再確認する必要がある.スレッドプールのステータスが変化していない場合は、次の内層ループを継続し、更新に成功するまでスレッドプールの数を再試行します.スレッドプール数の更新に成功すると、次の論理ブロックに入り、スレッドを追加します.
    boolean workerStarted = false;
     boolean workerAdded = false;
     Worker w = null;
     try {
         w = new Worker(firstTask);
         final Thread t = w.thread;
         if (t != null) {
             final ReentrantLock mainLock = this.mainLock;
             mainLock.lock();
             try {
                 // Recheck while holding lock.
                 // Back out on ThreadFactory failure or if
                 // shut down before lock acquired.
                 int rs = runStateOf(ctl.get());
    
                 if (rs < SHUTDOWN ||
                     (rs == SHUTDOWN && firstTask == null)) {
                     if (t.isAlive()) // precheck that t is startable
                         throw new IllegalThreadStateException();
                     workers.add(w);
                     int s = workers.size();
                     if (s > largestPoolSize)
                         largestPoolSize = s;
                     workerAdded = true;
                 }
             } finally {
                 mainLock.unlock();
             }
             if (workerAdded) {
                 t.start();
                 workerStarted = true;
             }
         }
     } finally {
         if (! workerStarted)
             addWorkerFailed(w);
     }
     return workerStarted;
    }

    5行目はまずworkerを新規作成し、このクラスの内部にはrunnableオブジェクトとスレッドオブジェクトがカプセル化されています(後で分析します)、9行目にロックをかけます.スレッドプール操作では、スレッドプールの状態変化に関わるすべてのコードが同期操作を行う必要があります.前の部分はCASで、この部分は再入可能ロックを使用しています.スレッドプールがrunning状態であると判断した後、20行workers.add(w)で、作業スレッドを1つのSetに追加します.workersは、現在のスレッドプールで生存しているスレッドを格納するためのHashSetです.あとは簡単で、最大のスレッド数を記録しました.キーは30行を見て、t.start()はスレッドを起動しました.このtは私たちが新しく作ったworkerのスレッドです.これで私たちのオンラインスレッド数がcorePoolSizeより小さいとき、開始されたタスクは直接新しいスレッドを通じて実行されます.発行されたタスクがどのように実行されているかを知るためには、Workerというクラスの分析を続けなければなりません.
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        public void run() {
            runWorker(this);
        }
        ...//     AbstractQueuedSynchronizer     
    }

    このworkerはRunnableインタフェースを継承しており、内部には3つのメンバー変数があり、注釈にはどのような役割があるのかよく書かれています.runメソッドに注目すると,実際にrunWorker(this)を呼び出し,自分をパラメータとして渡す.では、この方法に引き続きフォローします(何層も呼び出しているので、目を覚ましてください...):
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
    }

    本体はwhileサイクルであり、現在のtaskが空であるか否かを判断し、空でないで直接このタスクを実行し、キーコードは19行目のtaskである.run()は、Runnableのrunメソッドを直接呼び出します.それは、現在のスレッドで実行することです.では、問題が発生しました.現在のスレッドはいったいどのスレッドですか.少し乱れているのではないでしょうか.急いでフローチャートを見て、関数呼び出しを見てみましょう.
    Java 线程池ThreadPoolExecutor(基于jdk1.8)(二)_第1张图片
        
    第2のステップは、ユーザー・スレッドです.(executeメソッドを呼び出すときのそのスレッド)、後辺3歩後にはすでに新しいworkerスレッドに切り替わっているので、エンドユーザーがexecuteに渡したRunnableオブジェクトは新しいスレッドで実行されています.ok!上のコードのwhileループに戻るか、条件判断の2番目の条件が戻ってgetTaskメソッドを呼び出すか.名前を見てもわかりますが、Bloに行くに違いありませんckingQueue(タスクキャッシュキュー)で次の実行するタスクを取りに行きました.
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {      
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    7行目のboolean timed=allowCoreThreadTimeOut||wc>corePoolSizeは、主に現在のスレッドが破棄される必要があるかどうかを判断し、allowCoreThreadTimeOutは真であり、現在のスレッド数がcorePoolSizeを超えるかどうかにかかわらず、keepAliveTimeに基づいてスレッドの生存期間を判断する必要がある.偽の場合は2番目の文を見て、現在のスレッド数がcorePoolSizeより大きい場合はスレッドの生存期間を判断する必要があり、それより大きくない場合は生存期間を判断する必要はありません.次に、主に16-25行のtryコードブロックに注目し、スレッドの生存期間を判断する必要がある場合、rはworkQueueとなる.Poll(keepAliveTime,TimeUnit.NANOSECONDS)が返す値、つまりキャッシュキューからタスクを取り出そうとしますが、時間はkeepAliveTimeに制限されています.この時間内にタスクを取ったら戻り、取らなかったら異常を投げ出すので、このスレッドがkeepAliveTimeという時間帯で実行されていないことを示すと、スレッドは破棄されます.timeがfalseであると,スレッドの生存期間を判断する必要がないことを示し,workQueueを直接呼び出す.take()は、スレッドプールが使用するブロックキューのため、キューから新しいタスクを取得して戻るまでスレッドがブロックされます.ここまで私达のThreadPoolExecutorの大まかな流れは分析し终わって、その中にまだ多くの细かい点が完全にカバーすることができなくて、もし理解の不当な地方があるならば、みんなを歓迎して指摘して、共に交流を学びます!