Java Exector同時フレームワーク(二)剖析ThreadPool Exector実行過程

14623 ワード

前のページでは、全体としてExecutorインターフェースが紹介されています。前のページからExecutorフレームの最上位実装はThreadPoolExecutorクラスであることが分かりました。Executors工場クラスで提供されるnewScheduledThreadPoolnewFixedThreadPoolnewCachedThreadPool方法は、実はThreadPoolExecutorの構造関数パラメータが異なるだけです。異なるパラメータを導入することで、異なるアプリケーションシーンに適したスレッド池を構築することができますが、その底部原理はどのように実現されていますか?
1.スレッド池状態
運行過程を説明するなら、まず下のプログラムの状態を調べて何に分けますか?
volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;
ThreadPoolExecutorコードには、上記のいくつかの変数が定義されています。一つのvolatile変数runStateと、他のいくつかの状態を表す定数が定義されています。ThreadPoolExecutor:初期状態は、現在のスレッド池の動作状態を表し、その値は上記の4つの定数値の一つである。runState:スレッド池は新しいタスクを受け付け、キュータスクを実行中…RUNNING:新たなタスクは受け付けませんが、待ち行列Queuedのタスクは続行されます。shutdown()メソッドを呼び出すと、RUNNIG->SHUTDOWNからSHUTDOWN:新たなタスクを受け付けないと同時にキューQueued中のタスクを実行しないと、実行中のタスクの終了を試みます。shutdown Now()メソッドを呼び出したら、STOPからSTOP:スレッド池中のスレッドはすでに停止しており、他の動作はSTOP状態と同じである。
  • 待ち行列とスレッド池が空きましたら、SHUTDOWN->TERMNATED
  • からなります。
  • スレッド池が空の場合はSTOP-TERMNATED
  • からなります。
    2.スレッド池の運転タスク
    2.1変数の紹介
    運転過程を説明する前に、まずTERMINATEDの中のいくつかの重要なメンバー変数を見ます。
    private final BlockingQueue<Runnable> workQueue; //      ,          ,  worker         
    private final ReentrantLock mainLock = new ReentrantLock(); //   poolSize, corePoolSize,maximumPoolSize, runState, and workers set         
    private final HashSet<Worker> workers = new HashSet<Worker>(); //            
    private volatile long  keepAliveTime; //  corePoolSize          
    private volatile boolean allowCoreThreadTimeOut; //   corePoolSize            
    private volatile int   corePoolSize; //     
    private volatile int   maximumPoolSize; //     (                 ,               )
    private volatile int   poolSize; //          
    private volatile RejectedExecutionHandler handler; //      
    private volatile ThreadFactory threadFactory; //    ,      
    private int largestPoolSize; //                 
    private long completedTaskCount; //         
    ここではThreadPoolExecutorcorePoolSizemaximumPoolSizeの2つの変数を重点的に説明し、これらの2つの変数はスレッドのプールにスレッドの個数を作成するためのポリシーに関する。workQueue:この変数はスレッド池のコアサイズとして理解できます。例を挙げて説明します。
  • には10人の労働者がいます。新しい任務が来たら、指導者は任務を割り当てて労働者に与えます。労働者は一人一人の任務しかできません。
  • 人は10人の労働者が忙しい時に、新しく来た任務は列の中に置かれます。
  • 任務が積み重なるほど労働者の任務スピードをはるかに超える時、指導者は一つの方法を考えました。他の部門から10人の労働者を借りてきて、借りる数量は一つの公式があります。そして、新しく来た任務を借りた労働者に割り当ててやります。
  • ですが、速度がまだ急でないなら、いくつかのタスクを放棄するための措置を取るかもしれません。一定時間待ってから、任務は全部完成しました。労働者が暇な場合、借りた10人の労働者を返すことを考えます。
  • 、つまりcorePoolSizeはスレッドの池の大きさです。maximPoolSizeはスレッドの池の救済措置の一つと思います。つまり、タスク量が突然大きい時の救済措置です。
  • 2.2スレッド実行プロセス
    先に次の文章の例を見てください。
    ExecutorService executor = Executors.newFixedThreadPool(3);
    
            IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            }));
    上のコードは6つのタスクを新規作成し、スレッドの池に投げて実行し、実行が完了するまでスレッド名を出力します。その中の最も核心的な方法はcorePoolSize方法であり、execute()も任務を実行することができるが、その下の部分もsubmit()方法を起動するので、execute()の実現原理を理解すれば良い。
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {   //1.
                if (runState == RUNNING && workQueue.offer(command)) {    //2.
                    if (runState != RUNNING || poolSize == 0)   //3.
                        ensureQueuedTaskHandled(command);  //4.
                }
                else if (!addIfUnderMaximumPoolSize(command))  //5.
                    reject(command); // is shutdown or saturated //6
            }
        }
    上のコードは論理が複雑に見えるので、まず上の位置を見てみます。execute()は一つまたは表式で、二つの部分に分かれています。
  • は、まず、現在のスレッド数がコアスレッド数に等しいかどうかを判断し、そうでなければ直接if文ブロックに入る。そうでなければ、第二の部分
  • を判断する。
  • の第二の部分if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))は、スレッド数がコアスレッド数より小さい場合、新しいスレッド実行タスク(スレッド数がcorePoolSizeより小さいので、スレッドを直接に新規作成してタスクを実行し、現在のスレッドプールに空きがあるかどうかに関わらず)を実行する。新規作成に失敗したら、if文ブロックに入り、成功したらexecute方法は終了します。スレッドの新規作成に成功しましたので、タスクはオンラインで実行し始めました。
  • if文ブロックに入ると、上のコード2.addIfUnderCorePoolSize(command)を見ます。
  • は、現在のスレッドプール状態がRUNNIGであり、ジョブが待ち行列に入れられているかどうかを判断すると、直接if文ブロック
  • に進む。
  • そうでなければ、コード5.if (runState == RUNNING && workQueue.offer(command))で、新しいタスクが新しいスレッドで実行されるかどうかを判断します。
  • もし「借りた労働者」がまだ解決できないなら、任務拒否策を実行する。
    コードブロック3に進むif文ブロックif (!addIfUnderMaximumPoolSize(command))は、新しいタスクが待ち行列に追加されたため、この判断は、このタスクをタスクキャッシュキューに追加すると同時に他のスレッドが突然shutdownまたはshutdown Now方法を呼び出してスレッドを閉じるための緊急措置である。もしそうなら、応急処置に加入した新しいジョブif (runState != RUNNING || poolSize == 0)
    私たちは次の二つの重要な方法の実現を見ます。
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            return t != null;
        }
    まずロックを取得します。ensureQueuedTaskHandled(command)を再判断しました。execute()の方法では、他のスレッドに新しいスレッドが追加されたり、shutdown、shutdown Nowメソッドが起動されたりするのを防ぐために、ここでダブルチェックの効果があると判断しました。if (poolSize < corePoolSize && runState == RUNNING)であれば、true新規スレッド実行タスクが実行される。addThread方法はスレッド工場を通じてスレッドthreadを作成し、Workersオブジェクトにカプセル化してワーカーの列に参加し、スレッドを実行することで、Workerオブジェクトをスレッドのオブジェクトとして見ることができます。
    private Thread addThread(Runnable firstTask) {
            Worker w = new Worker(firstTask);
            Thread t = threadFactory.newThread(w);
            boolean workerStarted = false;
            if (t != null) {
                w.thread = t;
                workers.add(w);
                int nt = ++poolSize;
                if (nt > largestPoolSize)
                    largestPoolSize = nt;
                try {
                    t.start();
                    workerStarted = true;
                }
            }
            return t;
        }
    ここでWorkカーの対象を紹介します。Runnableインターフェースを実現しました。Runnableの代理クラスとして使えばいいです。最終的にはそのrun方法を実行します。Workカー内のt = addThread(firstTask)beforeExecuteの方法に注意してください。この2つの方法はThreadPoolExectorでは具体的に実現されていないので、ユーザはこの方法と後のafter Execute方法とを書き換えて、あるタスクの実行時間などの統計情報を行うことができます。新しいスレッドの異常時に捕獲できないので、after Executに記録する必要があります。これはスプリングカットと似ていますか?知識は全部同じです。それのrun方法を見てください。
    public void run() {
                try {
                    hasRun = true;
                    Runnable task = firstTask;
                    firstTask = null;
                    while (task != null || (task = getTask()) != null) {  //1
                        runTask(task);
                        task = null;
                    }
                } finally {
                    workerDone(this);
                }
            }
    コードブロック1は、ここで循環的にタスクを取得し、タスクが全部実行されるまで実行することができます。最初のタスクを除いて、他のタスクはafterExecute方法で取りに行きます。この方法はThreadPool Exectorの一つの方法です。私たちは、クラス全体では、タスクキャッシュのキューにのみジョブが保存されています。キャッシュキューに取りに行きます。
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll(); //   
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //                            ,
                    //   poll   ,             ,   null
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {    //       , r null,      worker      
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();   //         worker
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
    ここには非常に巧妙な設計方式があります。もし私達がスレッドを設計すると、スレッドが空きを見つけたら、タスクキャッシュのキューから空きスレッドにタスクを渡して実行します。しかし、ここではこのような方式は採用されていません。これはタスクの割り当てスレッドを追加的に管理するため、知らず知らずのうちに難易度と複雑度を増加させます。ここでは、タスクを実行したスレッドWorkerを直接にタスクキャッシュのキューにタスクを取りに行かせます。
    2.addIfUnider MaximPoolSize
    この方法の実現思想とaddIfUder CorePoolSize方法の実現思想は非常に似ています。唯一の違いは、addIfUnder MaximPoolSize方法は、オンラインプログラムプールのスレッド数がコアプールサイズに達し、タスクキューにタスクを追加して失敗した場合に実行されます。
    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < maximumPoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            return t != null;
        }
    ここに来て、大部分の友達は任務をスレッド池に提出してから実行されるまでの過程について基本的な理解があります。以下にまとめます。
  • まず、corePoolSizeとmaximPoolSizeの意味を明確にします。
  • の次に、Worketがどのような役割を果たしているかを知るべきです。
  • ジョブがスレッド池に提出された後の処理方針を知るには、ここでまとめると主に4つの点があります。
  • 現在のスレッド池のスレッド数がcorePoolSizeより小さい場合、各タスクはスレッドを作成してこのタスクを実行します。
  • は、現在のスレッドの数>=corePoolSizeであれば、タスクごとに、タスクキャッシュのキューに追加しようと試みられ、成功すれば、アイドルスレッドがそれを取り出して実行するのを待つ。追加に失敗した場合は、新しいスレッドを作成してこのタスクを実行してみます。
  • 現在のスレッド池のスレッド数がmaximPoolSizeに達すると、タスク拒否ポリシーを用いて処理される。
  • スレッド池のスレッド数がcorePoolSizeより大きい場合、あるスレッドの空き時間がkeep Alive Timeを超えると、スレッドは終了され、スレッド池のスレッド数がcorePoolSizeより大きくないまで止まる。コアプール内のスレッドに対して生存時間を設定することが許可されると、コアプール内のスレッドの空き時間がkeep Alive Timeを超えてスレッドも停止されます。
  • この記事を書き終わったら、後でタスクキャッシュのキューの種類がキャッシュされたポリシーやタスク拒否のポリシーなどを紹介します。もし文章に何か問題があれば、皆さんの指摘を歓迎します。