JAvaスレッドのexecutorsスレッドプール
11180 ワード
一、スレッドプールの役割
通常のビジネスでは、マルチスレッドを使用する場合は、ビジネス開始前にスレッドを作成し、ビジネス終了後にスレッドを破棄します.しかし、ビジネスにとって、スレッドの作成と破棄は、ビジネス自体とは関係なく、スレッドが実行するタスクにのみ関心を持っています.したがって、ビジネスに関係のないスレッドの作成や破棄ではなく、できるだけ多くのcpuを実行タスクに使用したいと考えています.スレッドプールはこの問題を解決しました.
スレッドプールの役割:スレッドプールの役割は、システム内で実行されるスレッドの数を制限することです.システムの環境状況に応じて、スレッドの数を自動的にまたは手動で設定し、実行に最適な効果を達成することができます.これにより、通常のスレッドの作成と破棄によるシステムオーバーヘッドを回避し、作成したスレッドが多すぎるため、システムリソースが消費され、サーバがダウンタイムになることを回避できます.Runtimeを使う.getRuntime().availableProcessors();スレッド数を設定します.
二、javaは提供したスレッドプールExecutorsクラスを発注する
A、newFixedThreadPoolは定長スレッドプールを作成するために使用され、スレッドの最大同時数を制御でき、超過したスレッドはキュー内で待機する
ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);
B、newSingleThreadExecutorは単一のスレッド化されたスレッドプールを作成するために使用され、それは唯一の作業スレッドだけでタスクを実行し、一度に1つしかサポートされず、すべてのタスクは指定された順序で実行される.
ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
C、newCachedThreadPoolは、長さ制限なしでキャッシュ可能なスレッドプールを作成するために使用されます.新しいタスクでは、空きスレッドがある場合は空きスレッドを使用して実行し、ない場合は新しいスレッドを作成してタスクを実行します.スレッドプールの長さが処理の必要性を超える場合は、空きスレッドを柔軟に回収できます.
ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
D、newScheduledThreadPoolは、一定のスレッドプールを作成し、タイミングと周期的な実行タスクをサポートするために使用されます.
ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);
newScheduledThreadPoolを使用したタイマーの実装
スレッドプールでタスクをコミットする2つの方法:
execute()メソッド:このメソッドはExecuterServiceインタフェースの親(インタフェース)メソッドで、このインタフェースにはこのメソッドしかありません.
submit()メソッド:ExecutorServiceインタフェースのメソッドです.
上のソースコードと説明からexecute()とsubmit()の方法の違いをまとめることができます.
1.受信パラメータが異なります.
2.submit()には戻り値があり、execute()にはありません.
三、カスタムスレッドプール
Javaスレッドプール内のnewCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPoolの4つのスレッドプールは,いずれも最下位でThreadPoolExecutor()という構造方法を呼び出している.Executorsというクラスが私たちのニーズを満たすことができない場合は、自分でカスタムスレッドプールを作成することができます.ThreadPoolExecutorクラスの定義は次のとおりです.
カスタムスレッドプールでは、境界キュー(ArrayBlockingQueue、LinkedBlockingQueue)を使用します.
新しいタスクを実行する必要がある場合、スレッドプールの実際のスレッド数がcorePoolSizeコアスレッド数より小さい場合は、スレッドを優先的に作成します.corePoolSizeより大きい場合、余分なスレッドがキューに格納されます.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより小さい場合、カスタムスレッドプールは新しいスレッドを作成します.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、拒否ポリシーまたはその他のカスタム方式が実行されます.
結果:
結果を見ると、直接異常を投げ出すタスクが実行されていないことがわかります.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、AbortPolicy:例外を直接投げ出し、システムが正常に動作する(デフォルトのポリシー)拒否ポリシーが実行されます.
カスタムスレッドプールの使用×××キュー:
について×××キューはシステムリソースが消費されない限り、×××キューにタスクのエンキューに失敗することはありません.システムのスレッド数がcorePoolSizeより小さい場合、新しいスレッドはcorePoolSizeを実行し、corePoolSizeに達した後、余分なタスクをキューに入れてタスクの作成と処理の速度の違いが大きい場合、×××キューは、システムメモリが消費されるまで急速に増加します.×××キューのスレッドプールmaximumPoolSizeは実際には使用されません.
四、ポリシーの拒否
JDK提供ポリシー:
1.AbortPolicy:異常を直接放出し、システムが正常に動作する.(デフォルトのポリシー)
2.CallerRunsPolicy:スレッドプールが閉じていない限り、このポリシーは呼び出し元スレッドで直接実行され、現在破棄されているタスクが実行されます.3.DiscardOrderstPolicy:最も古いリクエストを破棄し、現在のタスクを再送信しようとします.4.処理できないタスクを破棄し、処理しない.
カスタムポリシー:RejectedExecutionHandlerインタフェースの実装が必要
通常のビジネスでは、マルチスレッドを使用する場合は、ビジネス開始前にスレッドを作成し、ビジネス終了後にスレッドを破棄します.しかし、ビジネスにとって、スレッドの作成と破棄は、ビジネス自体とは関係なく、スレッドが実行するタスクにのみ関心を持っています.したがって、ビジネスに関係のないスレッドの作成や破棄ではなく、できるだけ多くのcpuを実行タスクに使用したいと考えています.スレッドプールはこの問題を解決しました.
スレッドプールの役割:スレッドプールの役割は、システム内で実行されるスレッドの数を制限することです.システムの環境状況に応じて、スレッドの数を自動的にまたは手動で設定し、実行に最適な効果を達成することができます.これにより、通常のスレッドの作成と破棄によるシステムオーバーヘッドを回避し、作成したスレッドが多すぎるため、システムリソースが消費され、サーバがダウンタイムになることを回避できます.Runtimeを使う.getRuntime().availableProcessors();スレッド数を設定します.
二、javaは提供したスレッドプールExecutorsクラスを発注する
A、newFixedThreadPoolは定長スレッドプールを作成するために使用され、スレッドの最大同時数を制御でき、超過したスレッドはキュー内で待機する
ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) { // , // : // : (0L ) // : // :LinkedBlockingQueue ××× ,
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
B、newSingleThreadExecutorは単一のスレッド化されたスレッドプールを作成するために使用され、それは唯一の作業スレッドだけでタスクを実行し、一度に1つしかサポートされず、すべてのタスクは指定された順序で実行される.
ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() { // , // : // : (0L ) // : // :LinkedBlockingQueue ,
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}
C、newCachedThreadPoolは、長さ制限なしでキャッシュ可能なスレッドプールを作成するために使用されます.新しいタスクでは、空きスレッドがある場合は空きスレッドを使用して実行し、ない場合は新しいスレッドを作成してタスクを実行します.スレッドプールの長さが処理の必要性を超える場合は、空きスレッドを柔軟に回収できます.
ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() { // , // :Integer.MAX_VALUE // : (60L 60 ) // : SECONDS // :SynchronousQueue ,
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
D、newScheduledThreadPoolは、一定のスレッドプールを作成し、タイミングと周期的な実行タスクをサポートするために使用されます.
ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);
public ScheduledThreadPoolExecutor(int corePoolSize) { // , // :Integer.MAX_VALUE // : (0 0 ) // : SECONDS // :DelayedWorkQueue
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
newScheduledThreadPoolを使用したタイマーの実装
package com.jalja.org.thread.executors;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;public class NewScheduledThreadPoolTest { public static void main(String[] args) {
Runnable runnable=new ScheduledThread(); //
ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2); //runnable 1: ( 1 ) 3: ( 3 ) //TimeUnit.SECONDS:
ScheduledFuture> scheduledFuture= executorsScheduled.scheduleWithFixedDelay(runnable, 1,3, TimeUnit.SECONDS);
System.out.println("scheduledFuture:"+scheduledFuture);
}
}
class ScheduledThread implements Runnable{ public void run() {
System.out.println(Thread.currentThread().getName() +"=> "); try {
Thread.sleep(1000);
} catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"=> ");
}
}
スレッドプールでタスクをコミットする2つの方法:
execute()メソッド:このメソッドはExecuterServiceインタフェースの親(インタフェース)メソッドで、このインタフェースにはこのメソッドしかありません.
public interface Executor { void execute(Runnable command);
}
submit()メソッド:ExecutorServiceインタフェースのメソッドです.
public interface ExecutorService extends Executor {
...
Future submit(Callable task);
Future submit(Runnable task, T result);
Future> submit(Runnable task);
...
}
上のソースコードと説明からexecute()とsubmit()の方法の違いをまとめることができます.
1.受信パラメータが異なります.
2.submit()には戻り値があり、execute()にはありません.
三、カスタムスレッドプール
Javaスレッドプール内のnewCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPoolの4つのスレッドプールは,いずれも最下位でThreadPoolExecutor()という構造方法を呼び出している.Executorsというクラスが私たちのニーズを満たすことができない場合は、自分でカスタムスレッドプールを作成することができます.ThreadPoolExecutorクラスの定義は次のとおりです.
public ThreadPoolExecutor(int corePoolSize,// --
int maximumPoolSize,// ,
long keepAliveTime,//
TimeUnit unit,//
BlockingQueue workQueue,//
ThreadFactory threadFactory//
) {……}
カスタムスレッドプールでは、境界キュー(ArrayBlockingQueue、LinkedBlockingQueue)を使用します.
新しいタスクを実行する必要がある場合、スレッドプールの実際のスレッド数がcorePoolSizeコアスレッド数より小さい場合は、スレッドを優先的に作成します.corePoolSizeより大きい場合、余分なスレッドがキューに格納されます.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより小さい場合、カスタムスレッドプールは新しいスレッドを作成します.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、拒否ポリシーまたはその他のカスタム方式が実行されます.
package com.jalja.org.thread.executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest { public static void main(String[] args) {
ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(2));
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.shutdown();
} class ThreadTest implements Runnable{ public void run() {
System.out.println(Thread.currentThread().getName()); try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
結果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jalja.org.thread.executors.ExecutorsTest$ThreadTest@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.jalja.org.thread.executors.ExecutorsTest.main(ExecutorsTest.java:17)
pool-1-thread-1pool-1-thread-2pool-1-thread-1pool-1-thread-2
結果を見ると、直接異常を投げ出すタスクが実行されていないことがわかります.キューがいっぱいで、最も要求されたスレッドがmaximumPoolSizeより大きい場合、AbortPolicy:例外を直接投げ出し、システムが正常に動作する(デフォルトのポリシー)拒否ポリシーが実行されます.
カスタムスレッドプールの使用×××キュー:
について×××キューはシステムリソースが消費されない限り、×××キューにタスクのエンキューに失敗することはありません.システムのスレッド数がcorePoolSizeより小さい場合、新しいスレッドはcorePoolSizeを実行し、corePoolSizeに達した後、余分なタスクをキューに入れてタスクの作成と処理の速度の違いが大きい場合、×××キューは、システムメモリが消費されるまで急速に増加します.×××キューのスレッドプールmaximumPoolSizeは実際には使用されません.
四、ポリシーの拒否
JDK提供ポリシー:
1.AbortPolicy:異常を直接放出し、システムが正常に動作する.(デフォルトのポリシー)
package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest { public static void main(String[] args) {
BlockingQueue f=new LinkedBlockingQueue(2);
ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f); try {
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
} catch (RejectedExecutionException e) {
e.printStackTrace();
System.out.println(" ");
}
test.shutdown();
} class ThreadTest implements Runnable{ public void run() {
System.out.println(Thread.currentThread().getName()); try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.CallerRunsPolicy:スレッドプールが閉じていない限り、このポリシーは呼び出し元スレッドで直接実行され、現在破棄されているタスクが実行されます.3.DiscardOrderstPolicy:最も古いリクエストを破棄し、現在のタスクを再送信しようとします.4.処理できないタスクを破棄し、処理しない.
カスタムポリシー:RejectedExecutionHandlerインタフェースの実装が必要
package com.jalja.org.thread.executors;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ExecutorsTest { public static void main(String[] args) {
BlockingQueue f=new LinkedBlockingQueue(2);
ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f, new MyRejected());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.execute(new ExecutorsTest().new ThreadTest());
test.shutdown();
} class ThreadTest implements Runnable{ public void run() {
System.out.println(Thread.currentThread().getName()); try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}class MyRejected implements RejectedExecutionHandler{ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(" ");
}
}