JAvaスレッドのexecutorsスレッドプール

11180 ワード

一、スレッドプールの役割
通常のビジネスでは、マルチスレッドを使用する場合は、ビジネス開始前にスレッドを作成し、ビジネス終了後にスレッドを破棄します.しかし、ビジネスにとって、スレッドの作成と破棄は、ビジネス自体とは関係なく、スレッドが実行するタスクにのみ関心を持っています.したがって、ビジネスに関係のないスレッドの作成や破棄ではなく、できるだけ多くのcpuを実行タスクに使用したいと考えています.スレッドプールはこの問題を解決しました.
スレッドプールの役割:スレッドプールの役割は、システム内で実行されるスレッドの数を制限することです.システムの環境状況に応じて、スレッドの数を自動的にまたは手動で設定し、実行に最適な効果を達成することができます.これにより、通常のスレッドの作成と破棄によるシステムオーバーヘッドを回避し、作成したスレッドが多すぎるため、システムリソースが消費され、サーバがダウンタイムになることを回避できます.Runtimeを使う.getRuntime().availableProcessors();スレッド数を設定します.
二、javaは提供したスレッドプールExecutorsクラスを発注する
A、newFixedThreadPoolは定長スレッドプールを作成するために使用され、スレッドの最大同時数を制御でき、超過したスレッドはキュー内で待機する
           ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);
 public static ExecutorService newFixedThreadPool(int nThreads) {        //               ,                         //   :                 //   :      (0L                  )        //   :              //   :LinkedBlockingQueue ×××   ,                
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
    }

B、newSingleThreadExecutorは単一のスレッド化されたスレッドプールを作成するために使用され、それは唯一の作業スレッドだけでタスクを実行し、一度に1つしかサポートされず、すべてのタスクは指定された順序で実行される.
    ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {        //               ,                         //   :                 //   :      (0L                  )        //   :              //   :LinkedBlockingQueue      ,                
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
    }

C、newCachedThreadPoolは、長さ制限なしでキャッシュ可能なスレッドプールを作成するために使用されます.新しいタスクでは、空きスレッドがある場合は空きスレッドを使用して実行し、ない場合は新しいスレッドを作成してタスクを実行します.スレッドプールの長さが処理の必要性を超える場合は、空きスレッドを柔軟に回収できます.
ExecutorService fixedThreadPool = Executors.newCachedThreadPool(); 
 public static ExecutorService newCachedThreadPool() {        //               ,                         //   :Integer.MAX_VALUE                    //   :      (60L       60      )        //   :       SECONDS          //   :SynchronousQueue       ,                   
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

D、newScheduledThreadPoolは、一定のスレッドプールを作成し、タイミングと周期的な実行タスクをサポートするために使用されます.
    ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);
public ScheduledThreadPoolExecutor(int corePoolSize) {        //               ,                         //   :Integer.MAX_VALUE                    //   :      (0       0      )        //   :       SECONDS          //   :DelayedWorkQueue      
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }

newScheduledThreadPoolを使用したタイマーの実装
package com.jalja.org.thread.executors;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;public class NewScheduledThreadPoolTest {    public static void main(String[] args) {
        Runnable runnable=new ScheduledThread();        //     
        ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);        //runnable          1:     (     1    )  3:    (  3   )        //TimeUnit.SECONDS:    
        ScheduledFuture> scheduledFuture= executorsScheduled.scheduleWithFixedDelay(runnable, 1,3, TimeUnit.SECONDS);
        System.out.println("scheduledFuture:"+scheduledFuture);
    }
} 
class  ScheduledThread implements Runnable{    public void run() {
        System.out.println(Thread.currentThread().getName() +"=>  ");        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() +"=>  ");
    }
}

スレッドプールでタスクをコミットする2つの方法:
execute()メソッド:このメソッドはExecuterServiceインタフェースの親(インタフェース)メソッドで、このインタフェースにはこのメソッドしかありません.
 
public interface Executor {    void execute(Runnable command);
}

 
submit()メソッド:ExecutorServiceインタフェースのメソッドです.
public interface ExecutorService extends Executor {
  ...
   Future submit(Callable task);

   Future submit(Runnable task, T result);

  Future> submit(Runnable task);
  ...
}

上のソースコードと説明からexecute()とsubmit()の方法の違いをまとめることができます.
  1.受信パラメータが異なります.
  2.submit()には戻り値があり、execute()にはありません.
 
三、カスタムスレッドプール
Javaスレッドプール内のnewCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPoolの4つのスレッドプールは,いずれも最下位でThreadPoolExecutor()という構造方法を呼び出している.Executorsというクラスが私たちのニーズを満たすことができない場合は、自分でカスタムスレッドプールを作成することができます.ThreadPoolExecutorクラスの定義は次のとおりです.
                public ThreadPoolExecutor(int corePoolSize,//     --               
                   int maximumPoolSize,//     ,               
                   long keepAliveTime,//        
                   TimeUnit unit,//          
                   BlockingQueue workQueue,//        
                   ThreadFactory threadFactory//      
                 ) {……}

カスタムスレッドプールでは、境界キュー(ArrayBlockingQueue、LinkedBlockingQueue)を使用します.
新しいタスクを実行する必要がある場合、スレッドプールの実際のスレッド数がcorePoolSizeコアスレッド数より小さい場合は、スレッドを優先的に作成します.corePoolSizeより大きい場合、余分なスレッドがキューに格納されます.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより小さい場合、カスタムスレッドプールは新しいスレッドを作成します.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、拒否ポリシーまたはその他のカスタム方式が実行されます.
package com.jalja.org.thread.executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(2));
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

結果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jalja.org.thread.executors.ExecutorsTest$ThreadTest@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.jalja.org.thread.executors.ExecutorsTest.main(ExecutorsTest.java:17)
pool-1-thread-1pool-1-thread-2pool-1-thread-1pool-1-thread-2

結果を見ると、直接異常を投げ出すタスクが実行されていないことがわかります.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、AbortPolicy:例外を直接投げ出し、システムが正常に動作する(デフォルトのポリシー)拒否ポリシーが実行されます.
カスタムスレッドプールの使用×××キュー:
について×××キューはシステムリソースが消費されない限り、×××キューにタスクのエンキューに失敗することはありません.システムのスレッド数がcorePoolSizeより小さい場合、新しいスレッドはcorePoolSizeを実行し、corePoolSizeに達した後、余分なタスクをキューに入れてタスクの作成と処理の速度の違いが大きい場合、×××キューは、システムメモリが消費されるまで急速に増加します.×××キューのスレッドプールmaximumPoolSizeは実際には使用されません.
四、ポリシーの拒否
JDK提供ポリシー:
 
1.AbortPolicy:異常を直接放出し、システムが正常に動作する.(デフォルトのポリシー)
package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        BlockingQueue f=new LinkedBlockingQueue(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f);        try {
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            System.out.println("             ");
        }
        test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

2.CallerRunsPolicy:スレッドプールが閉じていない限り、このポリシーは呼び出し元スレッドで直接実行され、現在破棄されているタスクが実行されます.3.DiscardOrderstPolicy:最も古いリクエストを破棄し、現在のタスクを再送信しようとします.4.処理できないタスクを破棄し、処理しない.
カスタムポリシー:RejectedExecutionHandlerインタフェースの実装が必要
package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest {    public static void main(String[] args) {
        BlockingQueue f=new LinkedBlockingQueue(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f, new MyRejected());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.shutdown();
    }    class ThreadTest implements Runnable{        public void run() {
            System.out.println(Thread.currentThread().getName());            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}class MyRejected implements RejectedExecutionHandler{    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("           ");
    }
}