JAVA---スレッドプール(ソース分析)


問題は最高の先生です.
一、考える
  • 問題1:スレッドプールを使用する必要がある理由
  • 問題2:スレッドプールの動作原理?
  • 問題3:スレッドプールはどのようにスレッドを多重化しますか?
  • 問題4:タスクが実行されていない場合、スレッドプールのスレッド数はコアスレッド数に等しいですか、0に等しいですか.最大スレッド数をコアスレッド数に下げるにはどうすればいいですか?
  • 問題5:スレッドプールにはどのような状態がありますか.スレッドプールが停止した後、追加したタスクはどのように処理しますか.追加するタスクはどのように処理しますか.
  • 問題6:スレッドプール内のブロックキューとその実現原理は何ですか.どのように選択しますか.
  • 問題7:スレッドプールにどのような拒否ポリシーがあり、どのような違いがありますか?
  • 問題8:スレッドプールを使用する方法は何ですか?
  • 問題9:スレッドプールをどのように合理的に構成しますか?

  • 備考:問題は最後にまとめられ、ソース分析の過程で分析が挿入されます.
    二、ソース分析
    1.スレッドプールの状態と作業スレッド数(ctl分析)
    //                     ,  runStateOf(ctl)    ,workerCountOf(ctl)        。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //32-3=29,      int  32      3   ,    。
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //       :0001 1111 1111 1111 1111 1111 1111 1111,              ,  29          。
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    //        ,   int      。              ,           。
    //1110 0000 0000 0000 0000 0000 0000 0000, 111(       )。
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0000 0000 0000 0000 0000 0000 0000 0000, 000。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //0010 0000 0000 0000 0000 0000 0000 0000, 001。
    private static final int STOP       =  1 << COUNT_BITS;
    //0100 0000 0000 0000 0000 0000 0000 0000, 010。
    private static final int TIDYING    =  2 << COUNT_BITS;
    //0110 0000 0000 0000 0000 0000 0000 0000, 011。
    private static final int TERMINATED =  3 << COUNT_BITS;

     
    // Packing and unpacking ctl
    //           :
    //1.X&1=X, 1    X     X。
    //2.X|0=X, 0    X      X。
    
    //  CAPACITY=0001 1111 1111 1111 1111 1111 1111 1111, ~CAPACITY=1110 0000 0000 0000 0000 0000 0000 0000 
    
    //  ctl         ,   。     1    。
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //  ctl        , 29 。     1    。
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //                   。     2    。
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    2.コアフィールド
    //          ,                       ,                   ,      ,              。
    private volatile boolean allowCoreThreadTimeOut;
    
    //     allowCoreThreadTimeOut true                ,        ,               ,      。
    private volatile long keepAliveTime;
    
    //      。
    private volatile int corePoolSize;
    
    //      。
    private volatile int maximumPoolSize;
    
    //              ,             。
    private final BlockingQueue workQueue;
    
    //    ,        。
    private volatile ThreadFactory threadFactory;
    
    //    ,              ,                 。
    private volatile RejectedExecutionHandler handler;

    3.動作原理
    //       
    //command:      
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    int c = ctl.get();
      //               ,            。
    if (workerCountOf(c) < corePoolSize) {
      //      ,  addWorker  。
    if (addWorker(command, true))
      //       ,    。
    return;
      //  
    c = ctl.get();
    }
    //             ,             ,          。
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
      //         ,         ,         ,             。
      //        ,           。
    if (! isRunning(recheck) && remove(command))
    reject(command);
      //       ,       0(             ,     ),    work        。
      //1.           。
      //2.         。           ,       。
    else if (workerCountOf(recheck) == 0)
      //addWorker      firstTask null      。
    addWorker(null, false);
    }
      //       ,           。              。           。          。
    else if (!addWorker(command, false))
    reject(command);
    }

     
    4.Workerクラス
    継承システムは次のとおりです.
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  • 重要フィールド
  • /** Thread this worker is running in.  Null if factory fails. */
    //       。       。
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    //          。
    Runnable firstTask;
  • 構造関数
  • Worker(Runnable firstTask) {
        //         Worker。      interruptIfStarted  。
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        //        。
        this.thread = getThreadFactory().newThread(this);
    }
  • 重要な方法
  • public void run() {
        //       runWorker  。
        runWorker(this);
    }
    //  Worker。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // state   0(        -1),    。
        w.unlock(); // allow interrupts
        //    。
        boolean completedAbruptly = true;
        try {
            //           ,    :
            //1.  firstTask   ,        。
            //2.firstTask  ,        ,        。    getTask()  ,          。
            //3.                          ,      。
            while (task != null || (task = getTask()) != null) {
                //  state 1,   。
                w.lock();
                //         ,           STOP        ,      。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); //              ,         。
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //    。
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    //       state    0   。
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    //                        。
    private Runnable getTask() {
        //            。
        boolean timedOut = false; // Did the last poll() time out?
        //  。
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
           //       ,    STOP,            。
           //     SHUTDOWN,     ,    ,     。
           //             ,       。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            //         ,             。
            //1.            。
            //2.             (                           )。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            //        ,          :
            //1.                             。
            //2.       1     。
            //   timedOut        true,      keepAliveTime        ,    。 keepAliceTime   ,    true。
            //              ,     due to a call to setMaximumPoolSize,           ,            。         。
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //          ,  CAS     ,            CAS  ,    。        CAS                  ,       allowCoreThreadTimeOut true,         。 
                continue;
            }
    
            try {
                //              :
                //1.     。
                //2.         ,         。
                //3.            (              )。
                //                           。
                //poll:  keepAliveTime  ,           ,     。
                //take:          ,     。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //         ,        ,  timeOut true,    ,         ,       。
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    5.スレッドプールのクローズ
    //     ,      SHUTDOWN  。
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //    ,JDK         。
            checkShutdownAccess();
           //        SHUTDOWN。
            advanceRunState(SHUTDOWN);
            //       ,           。
            interruptIdleWorkers();
            //    ,             。
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //       。          。
        tryTerminate();
    }
    // worker.tryLock   ,  worker      
    //  worker       , worker       ( state 1),  runWorker  ,    lock  ,  state 1。
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //    
                        t.interrupt();//            。
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    //           TERMINATED  。
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //        ,        。
            //  :     RUNING,            。
            //1.    STOP      (      0)。
            //2.    SHUTDOWN          。
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //   。
                        terminated();
                    } finally {
                        //     。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    //     ,    STOP  。
    public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            //  shutdown  ,        ,        , shutdown    idle  。
            interruptWorkers();
            //            。
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //   ,       。
        tryTerminate();
        return tasks;
    }

    6.スレッドプールの拒否ポリシー
    スレッドプールには4つの拒否ポリシーが用意されています.RejectedExecutionHandlerインタフェースを実装し、rejectedExecutionメソッドを書き換えます.デフォルトの拒否ポリシーはAbortPolicyです.次に、さまざまなポリシーの実装方法と違いを見てみましょう.
    AbortPolicy
    //      
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
    }

    CallersRunPolicy
    //        ,               。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
    }

    DiscardOldstPolicy
    //        ,            ,            
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
    }

    DiscardPolicy
    //         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }

    三、思考の総括
  • 問題1:スレッドプールを使用する必要がある理由

  • 答:主に:
    1.スレッドを多重化します.スレッドの頻繁な作成と破棄によるオーバーヘッドを回避します.
    2.スレッドをよりよく管理し、システムを制御できるようにする.
  • 問題2:スレッドプールの動作原理?

  • 答え:ソースコードで分析しました.自分でソース分析に基づいてフローチャートを描くことができます.
  • 問題3:スレッドプールはどのようにスレッドを多重化しますか?

  • 答え:具体的にはgetTask()メソッドを参照してください.まず、allowCoreThreadTimeOutパラメータを設定したかどうかによって決まります.trueに設定した場合、スレッドが常に多重化されることはできません.このパラメータを設定すると、コアスレッドがキュー内でkeepAliveTime時間を待ってもタスクが取得されず、コアスレッドは終了します.このパラメータが設定されていない場合、コアスレッドはBlockQueueを通過する.take()メソッドは、タスクが達成されるのを待つのをブロックし続け、タスクが達成されると、デッドサイクルによって取得を継続し、多重化の目的を達成する.もう一つはWorkerを1回だけ呼び出したことです.thread.start()は、各タスクの実行はRunnableを呼び出すrun()メソッドによって実行されます.
  • 問題4:タスクが実行されていない場合、スレッドプールのスレッド数はコアスレッド数に等しいですか、0に等しいですか.最大スレッド数をコアスレッド数に下げるにはどうすればいいですか?

  • A:allowCoreThreadTimeOutをtrueに設定すると、0になります.そうしないと、コアスレッド数になります.最大スレッド数をコアスレッド数に下げるには、上のソースコード分析の過程で説明したように、主にgetTask()方法で、スレッドがタイムアウトを許可するかどうかを判断するには、wc>corePoolSize、すなわち、作業スレッド数がコアスレッド数より大きい、すなわち、この場合、現在のスレッド変化がタイムアウトし、最終的に終了する条件がある.
  • 問題5:スレッドプールにはどのような状態がありますか.スレッドプールが停止した後、追加したタスクはどのように処理しますか.追加するタスクはどのように処理しますか.

  • ソース解析により,スレッドがshutdown()メソッドとshutdownNow()メソッドの2つの方法でスレッドプールを終了できることを見出した.スレッドは、それぞれSHUTDOWNおよびSTOP状態に置かれる.正常な状態はRUNNING状態です.3つの状態から分析できます
    1.RUNNING:正常な状態で、正常なスレッドプールでプロセスを実行する.
    2.SHUTDOWN:クローズ状態で、新しくコミットされたタスクは拒否されますが、ブロックされたキューにすでに存在するタスクが実行されます.
    3.STOP以上:ステータスを終了し、新しいタスクを拒否し、実行中であってもすべてのスレッドを中断し、キューを空にします.
  • 問題6:スレッドプール内のブロックキューとその実現原理は何ですか.どのように選択しますか.

  • 具体的には博文:【ブロックキュー】(https://blog.csdn.net/qq_31331965/article/details/100940842).
  • 問題7:スレッドプールにどのような拒否ポリシーがあり、どのような違いがありますか?

  • 文スレッドプール実行ポリシーを参照してください.
  • 問題8:スレッドプールを使用する方法は何ですか?

  • 1.ThreadPoolExecutorコンストラクション関数を使用してスレッドプールを手動で作成します.
    2.Executorsの静的方法によって特定の需要のスレッドプールを構築する.具体的には、ブログ:Executrosクラスの使用を参照してください.
  • 問題9:スレッドプールをどのように合理的に構成しますか?

  • 具体的には参考博文を参照してください.http://ifeve.com/how-to-calculate-threadpool-size/.