スレッド池ThreadPool Exector使用概要と方法例


一、概要
スレッド池類はjava.util.concurrent.ThreadPoolExecutorで、一般的な構造方法は:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, 
BlockingQueue workQueue, 
RejectedExecutionHandler handler) 
  • corePoolSize:スレッド池メンテナンススレッドの最小数
  • maximPoolSize:スレッド池メンテナンススレッドの最大数
  • keep Alive Time:スレッド池メンテナンススレッドによって許可される空き時間
  • unit:スレッドメンテナンススレッドが許可する空き時間の単位
  • workQue:スレッド池に使用されるバッファ
  • handler:スレッド池の拒否タスクに対する処理戦略
  • 一つのタスクはexecute(Runnable)方法でスレッドプールに追加され、タスクはRunnableタイプのオブジェクトであり、タスクの実行方法はRunnableタイプのオブジェクトのrun()方法である。
    タスクがexecute(Runnable)方法によってスレッドプールに追加されたい場合:
  • この時、スレッド池の数がcorePoolSizeより小さい場合、スレッドプールのスレッドがアイドル状態であっても、追加されたタスクを処理するために新しいスレッドを作成します。
  • この時点でスレッド池の数がcorePoolSizeに等しいが、バッファキューのworkQueが満杯していない場合、ジョブはバッファキューに入れられる。
  • この時スレッド池の数がcorePoolSizeより大きい場合、バッファキューworkQueがいっぱいになり、スレッド池の数がmaximPoolSizeより小さい場合、追加されたジョブを処理するために新しいスレッドを建設する。
  • この時スレッド池の数がcorePoolSizeより大きい場合、バッファキューworkQueがいっぱいであり、スレッド池の数がmaximPoolSizeに等しい場合、handlerによって指定されたポリシーによりこのタスクを処理する。
  • つまり、ジョブを処理する優先度は以下の通りです。
    コアスレッドcorePoolSize、タスクキューworkQue、最大スレッドmaximPoolSize、3つが満杯の場合、handlerを使って拒否されたタスクを処理します。
    スレッド池中のスレッド数がcorePoolSizeより大きい場合、あるスレッドの空き時間がkeep Alive Timeを超えると、スレッドは終了されます。このように、スレッド池は、プール内のスレッド数を動的に調整することができる。
    unitオプションのパラメータは、java.util.co ncurrent.TimeUnitのいくつかの静的属性である。
    NANOSECONDS、MICROSCONS、MILLISECONS、SECONDS。
    私がよく使うのは、java.util.concurrent.ArrayBlockingQueue です。
    ハンドルには四つの選択があります。
  • ThreadPool Exector.AbortPolicy()java.util.co ncurrent.RejectExecution Exception異常
  • を投げる
  • ThreadPool Exector.ClerRuns Policy()は現在のタスクを再試行し、彼は自動的にexecute()メソッド
  • を呼び出します。
  • ThreadPoolExector.DiscrdOldest Policy()古いタスクを捨てる
  • ThreadPoolExector.DiscrdPolicy()現在のタスクを破棄する
  • 二、一般的な用法例
    
    package demo;
    import java.io.Serializable;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    public class TestThreadPool2
    {
      private static int produceTaskSleepTime = 2;
      private static int produceTaskMaxNumber = 10;
      public static void main(String[] args)
      {
        //        
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
            new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 1; i <= produceTaskMaxNumber; i++)
        {
          try
          {
            //       ,         
            String task = "task@ " + i;
            System.out.println("put " + task);
            threadPool.execute(new ThreadPoolTask(task));
            //     ,      
            Thread.sleep(produceTaskSleepTime);
          }
          catch (Exception e)
          {
            e.printStackTrace();
          }
        }
      }
    }
    /**
     *         
     */
    class ThreadPoolTask implements Runnable, Serializable
    {
      private static final long serialVersionUID = 0;
      private static int consumeTaskSleepTime = 2000;
      //           
      private Object threadPoolTaskData;
      ThreadPoolTask(Object tasks)
      {
        this.threadPoolTaskData = tasks;
      }
      public void run()
      {
        //       ,           ,         
        System.out.println(Thread.currentThread().getName());
        System.out.println("start .." + threadPoolTaskData);
        try
        {
          // //    ,      
          Thread.sleep(consumeTaskSleepTime);
        }
        catch (Exception e)
        {
          e.printStackTrace();
        }
        threadPoolTaskData = null;
      }
      public Object getTask()
      {
        return this.threadPoolTaskData;
      }
    }
    説明:
    1、このプログラムでは、一つのタスクはRunnableタイプのオブジェクト、つまりThreadPoolTaskタイプのオブジェクトです。
    2、一般的にタスクは処理方式の他に処理が必要なデータがあり、処理したデータは構造方法によりタスクに転送されます。
    3、このプログラムの中で、main()の方法は残忍な指導者に相当して、彼は多くの任務を派遣して、threadPoolという任劳恨むグループになくしてやります。
    このチームの中に少なくとも二人の選手がいます。彼らが忙しくて来られないなら、任務は任務リストに入れられます。
    たまっているミッションが多すぎて、リストに入れられない(3つ以上)場合は、新しいメンバーを雇って手伝います。しかし、コストを考慮して、多すぎる隊員を雇えなくて、せいぜい4人しか雇えません。
    もし4人の選手が忙しい時、また新しい任務があると、このグループは処理できなくなります。任務は一つの策略を通じて処理されます。へへ。
    メンバーの仕事にはコストが必要です。仕事が暇だったら、3 SECONDSまで新しい仕事がないと、解雇される選手もいます。しかし、グループの正常運行のために、いくら仕事が暇でも、チームのメンバーは2人を下回ってはいけません。
    4、produceTask SleepTimeとconsumeTaskSleepTimeのサイズを調整することにより、配信タスクと処理タスクの速度の制御を実現し、これらの2つの値を変更すると、異なる速度でのプログラムの動作状況を観察することができる。
    5、4の中で指すデータを調整することによって、タスクの廃棄戦略を調整し、他の3つの戦略に切り替えることによって、異なる戦略下の異なる処理方式が見られます。
    6、他の使用方法については、jdkの助けを参照してください。分かりやすく、使いやすいです。
    別の例:
    
    package demo;
    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    public class ThreadPoolExecutorTest
    {
      private static int queueDeep = 4;
      public void createThreadPool()
      {
        /* 
         *      ,      2,      4,             3 , 
         *        4     ,          ,                , 
         *         (      ,      ),                    。 
         */ 
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
            new ThreadPoolExecutor.DiscardOldestPolicy());
        //         10    
        for (int i = 0; i < 10; i++)
        {
          try
          {
            Thread.sleep(1);
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
          while (getQueueSize(tpe.getQueue()) >= queueDeep)
          {
            System.out.println("    , 3      ");
            try
            {
              Thread.sleep(3000);
            }
            catch (InterruptedException e)
            {
              e.printStackTrace();
            }
          }
          TaskThreadPool ttp = new TaskThreadPool(i);
          System.out.println("put i:" + i);
          tpe.execute(ttp);
        }
        tpe.shutdown();
      }
      private synchronized int getQueueSize(Queue queue)
      {
        return queue.size();
      }
      public static void main(String[] args)
      {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.createThreadPool();
      }
      class TaskThreadPool implements Runnable
      {
        private int index;
        public TaskThreadPool(int index)
        {
          this.index = index;
        }
        public void run()
        {
          System.out.println(Thread.currentThread() + " index:" + index);
          try
          {
            Thread.sleep(3000);
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }
      }
    }
    締め括りをつける
    以上はこの文章の全部の内容です。本文の内容は皆さんの学習や仕事に対して一定の参考学習価値を持ってほしいです。ありがとうございます。もっと知りたいなら、下のリンクを見てください。