スレッドプールの使用

8094 ワード

スレッドプール実装クラスThreadPoolExecutor
ExecutorServiceインタフェースはExecutorインタフェースを拡張し、ライフサイクルの管理方法を追加しました.ThreadPoolExecutorはAbstractExecutorServiceを継承し、AbstractExecutorServiceはExecutorServiceインタフェースを実現しました.
ThreadPoolExecutorの汎用コンストラクション関数は次のとおりです.
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

 
 
  • corePoolSize(スレッドプールの基本サイズ):スレッドプールにタスクをコミットすると、スレッドプールはスレッドを作成してタスクを実行します.他の空き基本スレッドが新しいタスクを実行できてもスレッドが作成され、実行するタスクの数がスレッドプールの基本サイズより大きいまで作成されません.スレッドプールのprestartAllCoreThreadsメソッドが呼び出されると、スレッドプールは事前にすべての基本スレッドを作成して起動します.
  • runnableTaskQueue(タスクキュー):実行待ちのタスクを保存するためのブロックキュー.次のブロックキューを選択できます.
  • ArrayBlockingQueue:配列構造に基づく境界ブロックキューであり、このキューはFIFO(先進先出)の原則に従って要素をソートします.
  • LinkedBlockingQueue:チェーンテーブル構造に基づくブロックキューです.このキューはFIFO(先進先出)で要素をソートし、スループットは通常ArrayBlockingQueueより高いです.静的工場法Executors.新FixedThreadPool()はこのキューを使用しています.
  • SynchronousQueue:要素を格納しないブロックキュー.各挿入操作は別のスレッドが削除操作を呼び出すまで待たなければならない.そうしないと、挿入操作はずっとブロック状態にあり、スループットは通常LinkedBlockingQueueより高く、静的ファクトリメソッドExecutors.新CachedThreadPoolはこのキューを使用しています.
  • PriorityBlockingQueue:優先度のある無限のブロックキュー.

  • maximumPoolSize(スレッドプールの最大サイズ):スレッドプールが作成できる最大スレッド数.キューがいっぱいで、作成したスレッド数が最大スレッド数より小さい場合、スレッドプールは新しいスレッド実行タスクを作成します.境界のないタスクキューというパラメータを使用すると効果がないことに注意してください.
  • ThreadFactory:スレッドを作成するファクトリを設定します.スレッドファクトリを使用して、作成したスレッドごとにより意味のある名前を設定できます.
  • RejectedExecutionHandler(飽和ポリシー):キューとスレッドプールがいっぱいになった場合、スレッドプールが飽和状態にあることを示すには、コミットされた新しいタスクを処理するポリシーが必要です.このポリシーのデフォルトはAbortPolicyで、新しいタスクを処理できない場合に例外を放出することを示します.以下、JDK 1.5提供される4つのポリシー.
  • AbortPolicy:異常を直接放出します.
  • CallerRunsPolicy:呼び出し元のスレッドのみでタスクを実行します.
  • DiscardOldestPolicy:キュー内の最近のタスクを破棄し、現在のタスクを実行します.
  • DiscardPolicy:処理せず、廃棄します.
  • はもちろん、RejectedExecutionHandlerインタフェースのカスタムポリシーをアプリケーションシーンの必要に応じて実現することもできる.ログを記録したり、処理できないタスクを永続化したりします.

  • keepAliveTime(スレッドアクティビティ保持時間):スレッドプールのワークスレッドが空き、生存時間を維持します.したがって、タスクが多く、各タスクの実行時間が短い場合は、この時間を大きくしてスレッドの利用率を高めることができます.
  • TimeUnit(スレッドアクティビティ保持時間の単位):オプションの単位は、日(DAYS)、時間(HOURS)、分(MINUTES)、ミリ秒(MILLISECONDS)、マイクロ秒(MICROSECONDS、千分の一ミリ秒)、ミリ秒(NANOSECONDS、千分の一マイクロ秒)です.

  • スレッドプールの作成
    Executorsクラスにはいくつかの静的な工場があり、いくつかの一般的なスレッドプールを生成します.Executorsの戻り値のタイプはExecutorServiceで、その中にはnew ThreadPoolExecutorによって実現されています.ExecutorServiceはThreadPoolExecutorのインタフェースで、ExecutorsのnewFixedThreadPoolメソッドなどです.
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    Executorsでは、スレッドを作成する方法をいくつか用意しています.
  • newSingleThreadExecutor:単一スレッドのスレッドプールを作成します.このスレッドプールでは、1つのスレッドのみが動作します.つまり、1つのスレッドがすべてのタスクをシリアルで実行することに相当します.この唯一のスレッドが異常に終了した場合、新しいスレッドが置き換えられます.このスレッドプールは、すべてのタスクの実行順序がタスクのコミット順序で実行されることを保証します.
  • newFixedThreadPool:固定サイズのスレッドプールを作成します.タスクをコミットするたびに、スレッドがスレッドプールの最大サイズに達するまでスレッドが作成されます.スレッドプールのサイズは、最大値に達すると変更されず、実行異常によってスレッドが終了すると、スレッドプールに新しいスレッドが追加されます.
  • newCachedThreadPool:キャッシュ可能なスレッドプールを作成します.スレッドプールのサイズがタスクの処理に必要なスレッドを超えると、空き(60秒でタスクを実行しない)のスレッドの一部が回収され、タスク数が増加すると、このスレッドプールはスマートに新しいスレッドを追加してタスクを処理することができます.このスレッドプールはスレッドプールのサイズに制限はありません.スレッドプールのサイズは、オペレーティングシステム(またはJVM)が作成できる最大スレッドサイズに完全に依存します.
  • newScheduledThreadPool:無限サイズのスレッドプールを作成します.このスレッドプールは、Timerに相当するタイミングおよび周期的なタスク実行の要件をサポートします.
  • newSingleThreadScheduledExecutor:単一スレッドのスレッドプールを作成します.このスレッド・プールは、タイミングおよび周期的なタスク実行の要件をサポートします.

  • スレッドプールへのタスクのコミット
    Executerインタフェースはvoid execute(Runnable command)メソッドを使用してスレッドプールにタスクをコミットし、executeメソッドは値を返さず、スレッドプールによってタスクが成功したかどうかを判断できません.
    ExecutorServiceインタフェースには、スレッドタスクをコミットするための次の方法が追加されています.
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    それらの違いはRunnableが実行後に結果がなく、Callableが実行後に結果があることです.これは、複数のスレッドでステータスと結果を伝達するのに非常に役立ちます.また、彼らの同じ点は、Futureオブジェクトを返すことです.Futureオブジェクトは、実行が完了するまでスレッドをブロックしたり(結果を取得したり、もしあれば)、タスクの実行をキャンセルしたりすることができます.もちろん、タスクがキャンセルされたかどうか、または実行が完了したかどうかを検出することもできます.Futureがない前にスレッドが完了したかどうかを検出し、通常Threadを使用します.join()またはデッドサイクルとステータスビットを用いてスレッドの実行が完了したことを記述する.スレッドをブロックしたり、タスクの実行が完了したり、実行中または実行が開始されていないタスクを検出したりするより良い方法があります.
    次に、スレッドプール内のスレッドが受信要求として使用されるネットワークサービスの簡単な構造を示す.事前に構成するExecutorsを使用しています.新FixedThreadPool(int)工場方法:
     class NetworkService implements Runnable {
        private final ServerSocket serverSocket;
        private final ExecutorService pool;
    
        public NetworkService(int port, int poolSize)
            throws IOException {
          serverSocket = new ServerSocket(port);
          pool = Executors.newFixedThreadPool(poolSize);
        }
     
        public void run() { // run the service
          try {
            for (;;) {
              pool.execute(new Handler(serverSocket.accept()));
            }
          } catch (IOException ex) {
            pool.shutdown();
          }
        }
      }
    
      class Handler implements Runnable {
        private final Socket socket;
        Handler(Socket socket) { this.socket = socket; }
        public void run() {
          // read and service request on socket
        }
     }
    
     
    タイマのスレッドプール
    ExecutorsのnewScheduledThreadPool()メソッドとnewSingleThreadScheduledExecutor()メソッドは、ScheduledExecutorServiceタイプオブジェクトを返します.
    P u b l ic interface S h e d u l edExecutorService extends ExecutorService所定の遅延後に実行または定期的に実行されるコマンドをスケジュールするExecutorService.ScheduledExecutorServiceの実装クラスは次のとおりです.
    public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }

    ScheduledExecutorServiceが提供する方法は次のとおりです.
     <V> ScheduledFuture<V> 
     schedule(Callable<V> callable, long delay, TimeUnit unit) 
                             ScheduledFuture。 
     ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 
                                 。 
     ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 
                                      ,           ;      initialDelay      ,    initialDelay+period    ,    initialDelay + 2 * period    ,    。 
     ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
                                      ,  ,                          。 
    

    次の方法では、ScheduledExecutorServiceを設定し、1時間に10秒ごとにビープを鳴らします.
     import static java.util.concurrent.TimeUnit.*;
     class BeeperControl {
        private final ScheduledExecutorService scheduler = 
           Executors.newScheduledThreadPool(1);
    
        public void beepForAnHour() {
            final Runnable beeper = new Runnable() {
                    public void run() { System.out.println("beep"); }
                };
            final ScheduledFuture<?> beeperHandle = 
                scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
            scheduler.schedule(new Runnable() {
                    public void run() { beeperHandle.cancel(true); }
                }, 60 * 60, SECONDS);
        }
     }