Javaスレッドプールの作成の分析

9362 ワード

最近、プロジェクトの同時機能を改善していますが、開発がぶつかっています.たくさんの資料を見て、やっと認識を深めた.そこでソースコードの表示に合わせて,同時プログラミングの原理をまとめるつもりである.
最も多く使用されるスレッドプールから、スレッドプールのライフサイクル全体を認識する実装原理を作成、実行、閉じる準備をします.原子変数、同時コンテナ、ブロックキュー、同期ツール、ロックなどのトピックを後で検討します.java.util.concurrentのコンカレントツールは難しくありませんが、使うだけではありません.read the fucking source codeをください.はは.ちなみに私が使っているJDKは1.8です.

Executorフレームワーク


Executerはスレッドプール管理フレームワークで、インタフェースには1つの方法executeしかなく、Runnableタスクを実行します.ExecutorServiceインタフェースはExecutorを拡張し、スレッドライフサイクルの管理を追加し、タスクの終了、タスクの結果を返すなどの方法を提供します.AbstraactExecutorServiceはExecutorServiceを実装し、submitメソッドなどのデフォルト実装ロジックを提供します.
そして今日のトピックThreadPoolExecutorでは、AbstractExecutorServiceを継承し、スレッドプールの具体的な実装を提供します.

構築方法


以下はThreadPoolExecutorの最も一般的な構造関数で、最大7つのパラメータがあります.具体的なコードは貼られていません.パラメータの検証と設定の文だけです.
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

corePoolSizeは、スレッドプールのターゲットサイズであり、コアスレッド数とも呼ばれます.maximumPoolSizeはスレッドプールの最大上限であり、maximumPoolSizeからcorePoolSize、すなわち非コアスレッド数、またはアイドルスレッドを減算します.keepAliveTimeは,空きスレッドの生存時間を示し,生存時間を超えた空きスレッドは回収される.unitは言うまでもなく、残りの3つのパラメータは後文の分析を見ます.

あらかじめ設定されたカスタムスレッドプール


ThreadPoolExecutorは、Executorsのファクトリメソッドによって作成されるカスタマイズされたスレッドプールをプリセットしています.以下、newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPoolの作成パラメータを分析する.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

新FixedThreadPoolのcorePoolSizeとmaximumPoolSizeはいずれも着信固定数に設定され、keepAliveTimは0に設定されています.スレッドプールが作成されると、スレッドの数は固定され、スレッドが安定している必要がある場合に適しています.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }

NewSingleThreadExecutorはスレッド数が1に固定されたnewFixedThreadPoolバージョンで、プール内のタスクのシリアル化を保証します.F i n a l i z a b e D e l egatedExecutorServiceが返されていることに気づき、ソースコードを見てみましょう.
static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

F i n a l i z a b eDelegatedExecutorServiceはDelegatedExecutorServiceを継承しており、gc時にスレッドプールを閉じる操作を増やすだけで、DelegatedExecutorServiceのソースコードを見てみましょう.
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        //...
    }

コードは簡単で、D e l e gatedExecutorServiceはExecutorServiceをパッケージ化し、ExecutorServiceのメソッドのみを露出させるため、スレッドプールのパラメータを構成することはできません.本来,スレッドプール作成のパラメータは調整可能であり,ThreadPoolExecutorはsetメソッドを提供している.newSingleThreadExecutorを使用して、単一スレッドシリアルのスレッドプールを生成することを目的としていますが、スレッドプールサイズを設定できるなら、面白くありません.
Executorsでは、通常のスレッドプールを構成不可能なスレッドプールにパッケージするunconfigurableExecutorServiceメソッドも提供されています.スレッドプールが不明なので後で変更されたくない場合は、このメソッドを呼び出すことができます.
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

NewCachedThreadPoolはキャッシュできるスレッドプールを生成し、スレッド数は0からIntegerまで可能である.MAX_VALUE、タイムアウト時間は1分です.スレッドプールの効果は、空きスレッドがある場合、スレッドが多重化されます.空きスレッドがない場合は、新しいスレッドが作成されます.スレッドが1分以上空いている場合は、回収されます.
newScheduledThreadPool
新ScheduledThreadPoolは、タスクをタイミングよく実行できるスレッドプールを作成します.これは本文で展開するつもりはなく、後で別の文章で詳しく話します.

待ち行列


NewCachedThreadPoolのスレッド上限はほぼ無限に等しいが,システムリソースは限られており,タスクの処理速度はタスクのコミット速度に及ばない可能性がある.したがって、スレッド不足で待機しているRunnableタスクを保存するために、ThreadPoolExecutorにブロックキューを提供することができます.これがBlockingQueueです.
JDKはBlockingQueueにいくつかの実現方式を提供し、よく使われているのは以下の通りである.
  • ArrayBlockingQueue:配列構造のブロックキュー
  • LinkedBlockingQueue:チェーンテーブル構造のブロックキュー
  • PriorityBlockingQueue:優先順位のあるブロックキュー
  • SynchronousQueue:要素が格納されないブロックキュー
  • NewFixedThreadPoolおよびnewSingleThreadExecutorは、デフォルトでは無境界のLinkedBlockingQueueを使用します.タスクがコミットされているにもかかわらず、スレッドプールがタイムリーに処理されない場合、待機キューが無制限に長くなり、システムリソースが消費されることに注意してください.したがって、リソースの消費を回避するために、境界のある待機キューを使用することをお勧めします.しかし、一つの問題を解決すると、また新しい問題をもたらします.列が埋まってから、新しい任務に来ます.この時はどうしますか.キュー飽和の処理方法については、後述します.
    新CachedThreadPoolで使用されているSynchronousQueueは、名前がキューであるにもかかわらず要素を格納できないのが興味深い.1つのタスクをキューに入れるには、別のスレッドがこのタスクを受信する必要があります.1つのエントリには1つのエントリがあり、キューには何も格納されません.したがって、SynchronousQueueは移行メカニズムであり、キューとは言えません.NewCachedThreadPoolは上限のないスレッドプールを生成し,理論的にはどれだけのタスクをコミットしてもよいが,SynchronousQueueを待機キューとして使用するのが適切である.

    ほうわせんりゃく


    境界のある待機キューがいっぱいになると、飽和ポリシーを用いて処理する必要があり、ThreadPoolExecutorの飽和ポリシーは、RejectedExecutionHandlerに伝達されることによって実現される.コンストラクション関数の入力がない場合は、デフォルトのdefaultHandlerが使用されます.
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    
    public static class AbortPolicy implements RejectedExecutionHandler {
           public AbortPolicy() { }
           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
               throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
           }
       }
    

    AbortPolicyはデフォルトの実装であり、RejectedExecutionException異常を直接投げ出し、呼び出し者が自分で処理できるようにします.それ以外にもいくつかの飽和戦略があります.見てみましょう.
       public static class DiscardPolicy implements RejectedExecutionHandler {
           public DiscardPolicy() { }
           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           }
       }
    

    DiscardPolicyのrejectedExecutionは直接空の方法で、何もしません.列がいっぱいになったら、後続の任務はすべて捨てます.
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    

    DiscardOldestPolicyは待ち行列の中で最も古いタスクを蹴り飛ばし、新しいタスクを実行させます.
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public CallerRunsPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    

    最後の飽和戦略は、新しいタスクも古いタスクも捨てず、現在のスレッドで直接このタスクを実行するCallerRunsPolicyです.現在のスレッドは一般的にメインスレッドですね.メインスレッドにタスクを実行させ、ブロックされるかもしれません.一連の案をはっきり考えなければ、このような策略を少なくしたほうがいい.

    ThreadFactory


    スレッドプールが新しいスレッドを作成する必要があるたびに、スレッドファクトリで取得されます.ThreadPoolExecutorにスレッドファクトリを設定しない場合は、デフォルトのdefaultThreadFactoryが使用されます.
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    
    static class DefaultThreadFactory implements ThreadFactory {
           private static final AtomicInteger poolNumber = new AtomicInteger(1);
           private final ThreadGroup group;
           private final AtomicInteger threadNumber = new AtomicInteger(1);
           private final String namePrefix;
    
           DefaultThreadFactory() {
               SecurityManager s = System.getSecurityManager();
               group = (s != null) ? s.getThreadGroup() :
                                     Thread.currentThread().getThreadGroup();
               namePrefix = "pool-" +
                             poolNumber.getAndIncrement() +
                            "-thread-";
           }
    
           public Thread newThread(Runnable r) {
               Thread t = new Thread(group, r,
                                     namePrefix + threadNumber.getAndIncrement(),
                                     0);
               if (t.isDaemon())
                   t.setDaemon(false);
               if (t.getPriority() != Thread.NORM_PRIORITY)
                   t.setPriority(Thread.NORM_PRIORITY);
               return t;
           }
       }
    

    スレッドプール内のスレッドのnameを通常印刷するとpool-1-thread-1のような名前が出力されますが、ここで設定します.このデフォルトのスレッドファクトリでは、作成されたスレッドは通常の非デーモンスレッドであり、カスタマイズが必要な場合は、ThreadFactoryを実装した後、ThreadPoolExecutorに転送すればよい.
    コードを見ずにまとめないと分からないが,スレッドプールの作成だけで多くの学問を引き出すことができる.スレッドプールを通常作成するのはコードのことではありませんが、ThreadPoolExecutorは柔軟なカスタマイズ方法を提供しています.
    メッセージと転送を歓迎します.次はスレッドプールがどのようにタスクを実行するかを分析するつもりです.