スレッドプールソースプロセスの整理


ずっとスレッドプールを使っていて、最近暇なので、実行プロセスを見たいです!ついでに記録を!構造方法を一目見る
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {        //                   !
    if (corePoolSize < 0 ||                            //          0
        maximumPoolSize <= 0 ||       	//           0
        maximumPoolSize < corePoolSize ||    //                
        keepAliveTime < 0)                     //       0
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;     
}

excuteの方法を見てみましょう
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  //AtomicInteger                  worker  
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();            //AtomicInteger                  worker  
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))    //      ,       ,     
            reject(command);
        else if (workerCountOf(recheck) == 0)  //     0      
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//    
        reject(command);   //    
}

ctl AtomicIntegerタイプのスレッドプールの動作と動作を表すworkerの数private static final int RUNNING=-1<private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // running ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // null , returnfalse for (;;) { int wc = workerCountOf(c);// if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // core = true wc corePoolSize , maximumPoolSize boolean return false; // false if (compareAndIncrementWorkerCount(c)) // workercount , break retry; // ! c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // , continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; // boolean workerAdded = false; // start Worker w = null; try { w = new Worker(firstTask); // worker final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // , workers int s = workers.size(); if (s > largestPoolSize) // , ! largestPoolSize ? largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); // } if (workerAdded) { t.start(); // start workerStarted = true;// true } } } finally { if (! workerStarted) addWorkerFailed(w);// workers ! workercount } return workerStarted; }
まとめ:原子性の増加workerのcountが追加に成功した後!スレッドをwrokerクラスにカプセル化し、スレッドの状態が正常であると判断したら、workerをworker対列に追加し、スレッドのstartメソッドを呼び出します!runメソッド実行!次にrunworkerメソッドを呼び出してworkerクラスを見てみましょう
 Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);  //          
    }

 public void run() {
        runWorker(this);
    }

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();//       
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||    //      >=stop  
                 (Thread.interrupted() &&              //       
                  runStateAtLeast(ctl.get(), STOP))) &&   //ctl,          ,   !
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();   //    task      
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {   //           ,   ,completedAbruptly    true      false
        processWorkerExit(w, completedAbruptly);  
    }
}

runworkerメソッドは、実行前に最初にunlockし、中断を許可します!whileはnullではないと判断して実行を開始し、すぐにロックをかけます!この場合、他のスレッドの割り込みは許可されません.現在のスレッドの実行状態が判断され、終了されていないと判断した後、workerにカプセル化されたrunnableのrunメソッド、すなわちスレッドプールから転送されたタスクが実行され、現在のworkerのtaskおよびgetTask()が取得したタスクはnullでない方が繰り返し実行されます!タスクが実行できない場合は、最終的にprocessWorkerExit()スレッドを実行してメソッドを終了します.
次にgettaskメソッドがどのようにタスクを取得するかを見てみましょう!
 private Runnable getTask() {
       boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {   //   null,   workerCount     
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  //    ,  >                

        if ((wc > maximumPoolSize || (timed && timedOut))//               count>1    null                  workcount
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;//    workercount              !
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://    poll
                    workQueue.take();  //false       
            if (r != null)  //  null  
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

gettaskメソッド初期化タイムアウトtagはfalse,1)はループに入り,タスクキューにタスクがある場合に最大スレッド数が設定した最大数を超えるとスレッドを減らそうと判断し始める.2』キューからタスクを取り、タスク個数>コアスレッド個数の場合、pollを呼び出してタスクキューworkQueueの先頭からrunnableを取り、タスク個数の場合
プロセスWorkerExit()スレッド終了メソッドの続行
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount(); //runworker       ,  workercount

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w); //       
    } finally {
        mainLock.unlock();
    }

    tryTerminate(); //    

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {    //    stop      shoutdown  running  
        if (!completedAbruptly) {       //          
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//allowCoreThreadTimeOut       keeptimeout            ,min 0,        
            if (min == 0 && !workQueue.isEmpty()) //        min 0,            !        
                min = 1;
            if (workerCountOf(c) >= min)  //         ,  return   addworker()
                return; // replacement not needed
        }
        addWorker(null, false);//workercount          ,     worker  
    }
}


final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        if (isRunning(c) ||               //      shutdown   running,            shutdown,       null      return
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);   //      ONLY_ONE true         
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  //TIDYING  
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));//,ctl   ”    ” 0           
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

総括1』completedAbruptlyがtrueに対して実行過程に異常が発生したことを説明した場合、直接workerスレッドを減らし、ロックし、workersキューからworker 2を除去する.ONEがtrueの場合は1つだけ終了します.そうでない場合、workersキューはすべて終了します.3」スレッドは停止する必要がありますか.スレッドステータスがTIDYINGの場合、すべてのタスクが終了したことを示します.ctlレコードの「タスク数」が0の場合、スレッドプールはTIDYINGステータスになります.terminated()4を実行します.スレッドステータスがrunningまたはshutdownの場合、現在のスレッドが突然終了した場合、addWorker()現在のスレッドが突然終了していない場合、現在のスレッド数<メンテナンスするスレッド数、addWorker()スレッドプールshutdown()を呼び出すと、workQueueが空前になるまでスレッドプールはcorePoolSizeスレッドを維持し、順次破棄されます.
まとめ:簡単に言えばスレッドプールはスレッドファクトリを通じていくつかのスレッドを作成し、これらのスレッド内でfor(;)を使用します.常にブロックキューからタスクを取り、excuteでコミットしたタスクが最も重い場合は、このブロックキューに入ります.タスクの実行中、スレッドとブロックキューが満額になった場合、コールバックが投げ出されます.RejectedExecutionHandler、ここではタスクを失うべきか、異常を投げ出すべきか、最も古いタスクを失うべきか、などを処理します.タスクの実行が完了すると、スレッドは順次終了します.