ソースコードの観点からスレッドプールの動作原理を解析する

23288 ワード

WeChatの公衆番号「バックエンドの進歩」は、バックエンド技術の共有に専念しています.Java、Golang、WEBフレームワーク、分散型ミドルウェア、サービスガバナンスなどです.運転手さんはお金を傾けて教えてくれて、あなたを連れて階段を上がって、説明する時間がありません.早く車に乗りなさい.
スレッドプールの構造パラメータとよく使われない設定を説明した後、一部の学生はスレッドプールの原理を深く理解し続けたいと思っています.だから、この文章では、ソースコードに深く入り込んで、底からスレッドプールの運行原理を食べます.
ThreadPoolExecutor
ソースコードに進む前に、J.U.Cパッケージのスレッドプールクラス図を見てみましょう.
最上位レベルはExecutorインタフェースで、メソッドは1つしかありません.
public interface Executor {
    void execute(Runnable command);
}

JavaスレッドプールはExecutorフレームワークとも呼ばれ、新しいタスクを実行する簡単な方法を提供します.
ExecutorServiceはExecutorを拡張し、shutDown()、shutDownNow()などのスレッドプールのライフサイクルを操作する方法と、submit()などの非同期トレース実行タスクによる戻り値Futureを生成する方法を拡張します.
ThreadPoolExecutorはAbstractExecutorServiceから継承され、同時にExecutorServiceインタフェースを実現し、Executorフレームワークのデフォルトのスレッドプール実装クラスでもあり、この文章の重点的な分析の対象でもあり、一般的にスレッドプールを使用して、特別な要求がなければ、直接ThreadPoolExecutorを作成し、スレッドプールを初期化し、特殊なスレッドプールが必要であれば、ThreadPoolExecutorを直接継承し、スケジュールされたタスクを実行するスレッドプールなどの特定の機能を実現します.
次に、ThreadPoolExecutorのソース分析を開始します(以下、JDK 8バージョン).
ctl変数
ctlはInteger値であり、スレッドプールの実行状態とスレッドプール内の有効スレッド数を制御するフィールドであり、Integer値は全部で32ビットであり、そのうち上位3ビットは「スレッドプール状態」、下位29ビットは「スレッドプール内のタスク数」を表す.Doug Lea大神がどのように実現したかを見てみましょう.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
//               
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//                    
private static int workerCountOf(int c)  { return c & CAPACITY; }
//    ctl   
private static int ctlOf(int rs, int wc) { return rs | wc; }

スレッドプールには、次の5つのステータスがあります.
  • Running:スレッドプールの初期化時のデフォルトの状態は、スレッドが実行状態にあることを示し、新しくコミットされたタスクを受け入れることができ、同時にブロックキュー内のタスクを処理することができる.
  • SHUTDOWN:shutdown()メソッドを呼び出すと、スレッドプールがこの状態になり、新しいコミットされたタスクは続行されませんが、ブロックキュー内のタスクも処理されます.
  • STOP:shutdownNow()メソッドを呼び出すと、スレッドプールがこの状態になり、新しいコミットされたタスクを受け入れなくなり、ブロックキュー内のタスクを処理しなくなります.
  • TIDYING:スレッドプール内のworkerCount=0、すなわち有効スレッド数が0の場合、この状態に入ります.
  • TERMINTED:terminated()メソッドの実行後にこの状態に入りますが、terminated()メソッドは私たちが自分で実現する必要があります.

  • ビット演算を見てみましょう.
    COUNT_BITSはctl変数の有効スレッド数を示すビット数を表し、ここでCOUNT_BITS=29;
    CAPACITYは最大有効スレッド数を表し、ビット演算によりCOUNT_MASK=1111111111111111111111111111111で、これは10進数として約5億と計算され、設計当初から5億本を超えるスレッドを開かないことを考えていたので、十分でした.
    スレッドプールのステータスのビット演算では、次の値が得られます.
  • RUNNING:高3ビット値111
  • SHUTDOWN:高3ビット値000
  • STOP:高3ビット値001
  • TIDYING:高3ビット値010
  • TERMINTED:高3ビット値011
  • ここではDoug Lea大神がなぜ1つのInteger変数を使用して2つの値を表すのかを簡単に説明します.
    多くの人が考えているように、1つの変数が2つの値を表すと、ストレージスペースが節約されますが、ここは明らかにスペースを節約するために設計されたものではありません.この値を2つのInteger値に分割しても、1つのスレッドプールは4バイト増えただけで、この4バイトのために苦労して設計されています.明らかにDoug Lea大神の初心ではありません.
    マルチスレッド環境では、実行状態と有効スレッド数が統一されることを保証する必要があり、1つの変更があっても別の変更がない場合は、同じAtomicIntegerに配置し、AtomicIntegerの原子操作を利用すれば、この2つの値が常に統一されることを保証することができます.
    Doug Lea大神牛迫!
    Worker
    WorkerクラスはAQSを継承し、Runnableインタフェースを実装します.firstTaskとthreadの2つの重要なメンバー変数があります.FirstTaskは、最初に新規作成したタスクを保存するために使用されます.threadは、構築メソッドを呼び出すときにThreadFactoryによって作成されるスレッドであり、タスクを処理するためのスレッドである.
    スレッドプールにタスクを追加するにはどうすればいいですか?
    スレッドプールがタスクを実行するには、まずタスクを追加する必要があります.execute()はタスクを実行するという意味ですが、タスクを追加する手順も含まれています.次のソースコードです.
    java.util.concurrent.ThreadPoolExecutor#execute:
    public void execute(Runnable command) {
      //           ,      
      if (command == null)
        throw new NullPointerException();
      //   ctl 
      int c = ctl.get();
      // 1.                ,  addWorker    (            )
      if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
          return;
        c = ctl.get();
      }
      // 2.                 ,              ,            ,            
      if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
          reject(command);
        else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
      }
      // 3.        ,   addWorker    (            )
      else if (!addWorker(command, false))
        //         ,         
        reject(command);
    }
    

    ソースコードの解読は「スレッドプールのパラメータを知っていますか?」の面接問題の解析と同じであることがわかります.ここでexecuteがタスクを実行するフローチャートを描きます.
    引き続き下を見ると、addWorkerはタスクを追加し、方法のソースコードが少し長いので、論理的に2つの部分に分割して説明します.
    java.util.concurrent.ThreadPoolExecutor#addWorker:
    retry:
    for (;;) {
      int c = ctl.get();
      //            
      int rs = runStateOf(c);
    
      //   rs  SHUTDOWN,                
      //   rs  SHUTDOWN,    firstTask  ,          ,       
      //             SHUTDOWN   ,              ,               
      if (rs >= SHUTDOWN &&
          ! (rs == SHUTDOWN &&
             firstTask == null &&
             ! workQueue.isEmpty()))
        return false;
    
      for (;;) {
        //         
        int wc = workerCountOf(c);
        //                        (       ),      
        //                    ,       
        //                     ,core=true          
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
          return false;
        //   AQS        
        if (compareAndIncrementWorkerCount(c))
          break retry;
        //       ctl   
        c = ctl.get();  // Re-read ctl
        //         ,     ,      
        if (runStateOf(c) != rs)
          continue retry;
        // else CAS failed due to workerCount change; retry inner loop
      }
    }
    

    ここで特に,firstTaskはスレッド実行を開始する最初のタスクであり,その後オンラインスレッドプールに常駐するスレッド実行のタスクはすべてブロックキューから取り出されることを強調し,注意が必要である.
    以上のforループコードの主な役割はctl変数の現在の状態がタスクを追加できるかどうかを判断することであり、特にスレッドプールがSHUTDOWN状態にある場合、ブロックキュー内のタスクを実行し続けることができるが、スレッドプールにタスクを追加し続けることはできないことを説明した.同時に作業スレッド数を増やすにはAQSを同期として使用し、同期に失敗した場合はループ実行を続行します.
    //        
    boolean workerStarted = false;
    //        
    boolean workerAdded = false;
    //      ,           Worker 
    Worker w = null;
    try {
      //     Worker
      w = new Worker(firstTask);
      //   Worker  Thread 
      final Thread t = w.thread;
      if (t != null) {
        //   workers HashSet           
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          // Recheck while holding lock.
          // Back out on ThreadFactory failure or if
          // shut down before lock acquired.
          //             
          int rs = runStateOf(ctl.get());
          // rs < SHUTDOWN   RUNNING  ;
          //   rs RUNNING    rs SHUTDOWN    firstTask null,         。
          //    SHUTDOWN          ,      workQueue    
          // rs RUNNING   ,          
          //  rs  SHUTDOWN ,  firstTask  ,           ,     SHUTDOWN          
          if (rs < SHUTDOWN ||
              (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
              throw new IllegalThreadStateException();
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
              largestPoolSize = s;
            workerAdded = true;
          }
        } finally {
          mainLock.unlock();
        }
        //         
        if (workerAdded) {
          t.start();
          workerStarted = true;
        }
      }
    } finally {
      if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
    }
    

    以上のソースコードの主な役割は、Workerオブジェクトを作成し、新しいタスクをWorkerにロードし、同期をオンにしてWorkerをworkersに追加することです.ここでは、workersのデータ構造がHashSetであり、スレッドではなく安全であるため、workersを操作するには同期ロックが必要です.追加手順が完了するとスレッドを起動してタスクを実行し、下を見続けます.
    タスクの実行方法
    上記のコードに気づきました.
    //         
    if (workerAdded) {
      t.start();
      workerStarted = true;
    }
    

    ここでのtはw.threadによって得られたものであり、つまり、ThreadFactoryによって作成されたWorker内のタスクを実行するためのスレッドであり、Workerを生成する構造方法を見てみましょう.
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    

    新ThreadのパラメータはWorker自体であり、WorkerはRunnableインタフェースを実現しているので、t.start()を実行すると、Workerのrun()メソッドが実行され、エントリが見つかりました.
    java.util.concurrent.ThreadPoolExecutor.Worker#run:
    public void run() {
      runWorker(this);
    }
    

    java.util.concurrent.ThreadPoolExecutor#runWorker:
    final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
        //    workQueue            
        while (task != null || (task = getTask()) != null) {
          //                             
          w.lock();
          //          ,         
          if ((runStateAtLeast(ctl.get(), STOP) ||
               (Thread.interrupted() &&
                runStateAtLeast(ctl.get(), STOP))) &&
              !wt.isInterrupted())
            wt.interrupt();
          try {
            //             
            beforeExecute(wt, task);
            Throwable thrown = null;
            try {
              //     
              task.run();
            } catch (RuntimeException x) {
              thrown = x; throw x;
            } catch (Error x) {
              thrown = x; throw x;
            } catch (Throwable x) {
              thrown = x; throw new Error(x);
            } finally {
              //             
              afterExecute(task, thrown);
            }
          } finally {
            //  task   ,       getTask()   workQueue         
            task = null;
            //   Worker        
            w.completedTasks++;
            w.unlock();
          }
        }
        completedAbruptly = false;
      } finally {
        //       
        processWorkerExit(w, completedAbruptly);
      }
    }
    

    このステップは、タスクを実行するためのコア・メソッドです.空ではないfirstTaskタスクを初めて実行します.その後、workQueueブロック・キューからタスクを取得して実行します.タスクの実行前後に人に知られない小さな動作をしたい場合は、ThreadPoolExecutorを次の2つの方法で実現できます.
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    

    これにより、タスクの実行をリアルタイムで監視できます.
    ここで、finallyブロックでは、スレッドがgetTask()メソッドを呼び出してworkQueueブロックキューからタスクを取得するためにtaskを空にします.
    コアスレッドが破棄されないことを保証するにはどうすればいいですか?
    スレッドプールでcorePoolSizeの数を維持できる常駐コアスレッドは、スレッドプールで回収されずにタスクを完了することをどのように保証しているのでしょうか.前の章では、workQueueキューからブロック的にタスクを取得するまで、タスクを取得していなければ、ずっとブロックされていて、頭がいいので、答えを知っています.今、Doug Lea大神がどのように実現したかを見てみましょう.
    java.util.concurrent.ThreadPoolExecutor#getTask:
    private Runnable getTask() {
      //     ,   false,    workQueue.poll()     ,    true
      //          ,     
      boolean timedOut = false;
    
      for (;;) {
        //   ctl   
        int c = ctl.get();
        int rs = runStateOf(c);
    
        //           SHUTDOWN,  workQueue              STOP
        //    AQS        ,    null,     
        //         SHUTDOWN    ,  workQueue   ,                  
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
          //   AQS            
          decrementWorkerCount();
          return null;
        }
    
        //              
        int wc = workerCountOf(c);
    
        //          allowCoreThreadTimeOut            corePoolSize,              
        // allowCoreThreadTimeOut   false,              
        //                   “  ”  ,         
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //               :
        // 1.             maximumPoolSize,            setMaximumPoolSize  ,              ,                     ,           ,            maximumPoolSize,
        // 2.timed && timedOut    true,              ,   timedOut true,        workQueue.poll()     
        //         ,           
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          //    AQS          
          if (compareAndDecrementWorkerCount(c))
            //        null,     
            return null;
          //       
          continue;
        }
    
        try {
          //   timed true,        ,        
          Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
          if (r != null)
            return r;
          //   poll         ,  timeOut   true
          //       ,          allowCoreThreadTimeOut,             
          timedOut = true;
        } catch (InterruptedException retry) {
          timedOut = false;
        }
      }
    }
    

    私はgetTask()メソッドのソースコードの深さ解析をソースコードに対応する場所に書きました.この方法はデフォルトの場合、コアスレッドが破棄されないコア実装を実現することです.
  • timedOutタイムアウトフラグをfalseにデフォルト設定します.
  • はスレッドの生死の大権を決定するtimedの値を計算し、(timed&&timedOut)はスレッドのタイムアウト回収の条件の一つであり、timedOutのデフォルト値がfalseであるため、pollタイムアウト取得の操作はまだ行われていないことに注意しなければならない.
  • は、タイムアウト値に基づいてタスクをブロックタイムアウトで取得するか、タスクをブロックして取得するかを決定し、ブロックタイムアウトでタスクを取得すると、タイムアウト後にtimedOutがtrueに設定され、ループが継続され、このとき(timed&&timedOut)がtrueとなり、スレッドタイムアウト回収を満たす.

  • 心血を注いだ1篇のソースコードの解読はこれで終わり、学生たちがスレッドプールの底の原理を徹底的に食べるのを助けることができることを望んで、後で面接官があなたにスレッドプールの問題を聞くことに出会って、あなたは「後端進級」のスレッドプールのソースコードの解読を見たことがあると言って、面接官はこの時あなたを褒めます:
    この学生の基礎は本当にしっかりしている.