スレッドプールとThreadPoolExecutor学習ノート


Javaを勉強したり使ったりする時間も短くないので、スレッドプールに関する技術の詳細をよく勉強する機会を見つけたいと思っていましたが、最近はちょうど暇なので、暇を見つけてJDK 1.7スレッドプールの実現を検討しました.スレッドプールテクノロジーは、サーバ側の開発においてもクライアント側の開発においても重要です.スレッドプールの最大の利点は、オブジェクトの作成と破棄によるリソース消費を削減することです.オブジェクト向けプログラミングでは、オブジェクトの作成と破棄はリソースを消費します.Javaでは、Java仮想機会が各オブジェクトを追跡し、オブジェクトが破棄された後にゴミ回収を行うことが「プール化テクノロジー」の原因です.サーバ側の開発はあまり詳しくありません.クライアント開発にとって.UI以外のスレッドで作業を行う必要があることがよくありますが、毎回スレッドを作成すると、CPU、メモリ、バッテリーが制限されている携帯電話では受け入れられません.そのため、Androidの開発にもスレッドプールの技術を採用し、リソースの消費を回避しています.私はAndroidプログラム猿で、使用言語はJavaなので、JDKのスレッドプール実装を使って説明します.クラスを理解するには、まずその構造関数から学ぶ必要があります.まず、一般的な構造関数を見てみましょう.
/** * @param corePoolSize              ,              corePoolSize   * @param maximumPoolSize                * @param keepAliveTime             corePoolSize,      idle      ,              * @param unit keepAliveTime      * @param workQueue       Runnable       * @param threadFactory  executor            factory */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

コンストラクション関数から分かるように、ThreadPoolExecutorは、コミットされたRunnableオブジェクトを処理し続けるスレッドプールを作成することができ、スレッドプールの実行時間後のスレッドの個数はcorePoolSizeに等しく、maximumPoolSizeはスレッドプールの中で最大のスレッド個数であり、スレッドプールのスレッドの個数がcorePoolSizeより大きく、idle状態にあるスレッドによって、keepAliveTimeを通過するとスレッドは破棄されます.スレッドプールはBlockingQueueを使用してコミット実行Runnableを保存し、queueが空の場合、スレッドは待ち続け、ブロックされます.次に、ThreadPoolExecutorというクラスの中で常に貫かれている変数と方法についてお話しします.これらはThreadPoolExecutorクラスの設計の巧みさです.ctlはatomic integerであり、スレッドプールの実行状態と現在のスレッドプールのworkerスレッド数を保存します.スレッドプール全体の最大容量は2^29-1であり、ctlの低い29ビットはworkerスレッドの個数を格納し、ctlの3ビットはRUNNING、SHUTDOWN、STOPなどのスレッドプールの状態を格納するために使用される.ソースコードにおけるこのような設計についての解釈は2^29-1もすでに非常に大きな数であり,後で拡張するとatomic longに置き換えるのも便利であり,またThreadPoolExecutorではスレッドプール状態やworkerスレッド個数を頻繁に照会する必要があり,シフト演算効率が比較的高いため,このような方式で設計されている.くだらないことは言わないで、次は直接ソースに行きます.
    //           worker    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //worker       ,2^29 - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //          
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    //  ctl  3 ,       
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //  ctl  29 ,  worker    
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

次はThreadPoolExecutorのいくつかの重要な方法です.
public void execute(Runnable command) {
        //    ,     
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //  worker      corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            //  addWorker  ,       
            //          ,  command           
            //          ,    
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //  worker     >= corePoolSize
        //         ,   command        Queue 
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //         ,    ,  Queue   command,       
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //           ,  worker       0,       
            //                 corePoolSize maximumPoolSize     
            //        corePoolSize            ,       maximumPoolSize    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //         running
        //      running,        ,       
        else if (!addWorker(command, false))
            reject(command);
    }

executeメソッドのまとめについて、(1)スレッドプールが新しいタスクを受信した場合,現在のスレッド数がcorePoolSizeより小さい場合,addWorkerメソッドの新規スレッドを直接呼び出し,commandを最初のタスクとして実行する.(2)現在のスレッド数がcorePoolSize以上である場合、スレッドプールの実行状態を最初に判断し、ステータスが正常である場合、タスクをキューに追加しようとします.スレッドプールのステータスが異常であるか、キャッシュキューがいっぱいである場合は、workerスレッドを追加しようとしますが、workerスレッドの新規追加に失敗した場合はcommandを拒否します.(3)スレッド数がcorePoolSize以上で、スレッドプールの状態が正常で、タスクがキャッシュキューに正常に追加された場合、スレッドプールの状態を再度チェックします.スレッドプールが切れた場合、タスクは拒否されます.そうでなければ、再チェック状態が正常で、workerスレッド数が0の場合、workerスレッドが追加されます.
    /** * @param firstTask           runnable   * @paran core    corePoolSize        */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //       
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                //         
                return false;

            for (;;) {
            //  worker    
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //         
                    return false;
                //  worker    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //       
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //    worker  
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // worker  set    
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //    worker  ,  runnable
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorkerメソッドの概要:このメソッドは主にworkerオブジェクトの追加を完了し、追加に成功するとrunnableを実行しますが、runnableは空になる可能性があります.
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //         ,worker addWorker      ,run       , run          。     ,  while   ,    getTask blockingQueue        ,        ,      ,               
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker()の方法の核心はwhileループを介してgetTaskを呼び出してblockingQueueからタスクを取り出して実行することであり,キャッシュキューが空の場合,新しいタスクがキャッシュキューに格納されるまでスレッドがブロックされる.これでスレッドプールの簡単な学習ノートはこれで終わり、スレッドプールはモバイル開発にとって非常に重要な意義を持っており、把握しなければならない.