ThreadPoolExecutorにおける飽和戦略解析

8732 ワード

import java.util.concurrent.TimeUnit;

public class ThreadPoolTask implements Runnable {

	private final Object threadPoolTaskData;
	private static long consumerTaskSleepTime = 2L;
	
	public ThreadPoolTask(Object tasks) {
		this.threadPoolTaskData = tasks;
	}
	
	@Override
	public void run() {
		System.out.println("start :" + threadPoolTaskData);
		try {
			TimeUnit.SECONDS.sleep(consumerTaskSleepTime);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("finish " + threadPoolTaskData);   
	}
}

Abort(中止)ポリシー:このポリシーは、チェックされていないRejectedExecutionExceptionを放出します.呼び出し者は、この例外をキャプチャし、必要に応じて独自の処理コードを作成することができます.コードは次のとおりです.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.AbortPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

プログラムの実行結果は次のとおりです.
put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
start :task@ 5
put task@ 7
start :task@ 6
put task@ 8
java.util.concurrent.RejectedExecutionException
put task@ 9
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
finish task@ 0
start :task@ 2
finish task@ 1
start :task@ 3
finish task@ 5
start :task@ 4
finish task@ 6
finish task@ 2
finish task@ 3
finish task@ 4

解析実行結果:
corepoolsize = 2 ,maxpoolsize = 4,queue size = 3.実行中のスレッド数+キューで待機しているスレッド数=7、およびコミットされたスレッド数が7より大きい場合は例外が放出されます.
 
DiscardPolicyポリシーは、新しいコミットされたタスクをこっそり放棄します.コードは次のとおりです.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

実行結果は次のとおりです.
put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 6
start :task@ 5
finish task@ 0
finish task@ 6
start :task@ 2
finish task@ 1
start :task@ 3
start :task@ 4
finish task@ 5
finish task@ 2
finish task@ 3
finish task@ 4

解析結果:
提出されたタスク数は10個,開始と終了のタスク数は7個である.新しく提出された任務7、8、9を捨てた.
 
DiscardOldestPolicyは古いスレッドを捨てた.コードは次のとおりです.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

実行結果は次のとおりです.
put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 0
start :task@ 5
start :task@ 1
start :task@ 6
finish task@ 0
start :task@ 7
finish task@ 5
start :task@ 8
finish task@ 1
start :task@ 9
finish task@ 6
finish task@ 7
finish task@ 8
finish task@ 9

結果の分析:
スレッドプールは2,3,4を捨て,スレッド0,1はキューに入らず実行を開始する.だから投げ出されない.
 
CallerRunsPolicyポリシーは、タスクを放棄したり、異常を投げ出したりすることなく、一部のタスクを呼び出し元に戻し、新しいタスクのトラフィックを低減する調整メカニズムを実現します.スレッドプールのスレッドで新しくコミットされたタスクを実行するのではなく、executeが呼び出されたスレッドで実行します.コードは次のとおりです.
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

実行結果は次のとおりです.
put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
start :task@ 7
start :task@ 6
start :task@ 0
start :task@ 5
finish task@ 7
put task@ 8
finish task@ 1
start :task@ 2
finish task@ 6
finish task@ 0
finish task@ 5
put task@ 9
start :task@ 3
start :task@ 8
start :task@ 4
finish task@ 2
start :task@ 9
finish task@ 3
finish task@ 8
finish task@ 4
finish task@ 9

結果の分析:
すべてのスレッドが実行されました.