Java開発ノート(百零六)Fork+Joinフレームワークの分割を実現


通常のスレッドプールとタイマスレッドプールの使用方法については、スレッドプールの内部スレッド間には関連がないという共通点がありますが、場合によっては各スレッド間に前因結果関係がある場合があります.例えば国勢調査では、わが国の総人口が14億前後であることはよく知られていますが、14億の数はどのように数えられていますか.一人だけ統計をとると、小数から老まで数えきれない.例えば、1つのスレッドの牛が車を破ってもあまり仕事ができないように、それなら、もっとスレッドを作ってみてはいかがでしょうか.そこで国勢調査の仕事は中央から各省に分解され、各省はまた下の市県に割り当てられ、市県からもっと下の街や郷鎮に割り当てられ、各街と郷鎮が管轄区内の人口数を統計した後、それぞれ対応する市県に報告され、市県は省に報告され、最後に各省から中央に報告され、全国の人口総数を統計した.国勢調査の例では、これらのスレッドには上下関係があるだけでなく、下位スレッドのタスクは上位スレッドから割り当てられ、下位スレッドの処理結果は上位スレッドの要約に渡される.タスクフローの行方によって、全体の処理過程を以下の3つの段階に分けることができる:1、第1段階はメインスレッドから始まり、上から下へタスクを段階的に分解し、この時スレッドの総数は次第に多くなり、各スレッドは前後して上位スレッドから割り当てられたタスクを受け取る.2、第2段階は一番下の末端スレッドによって具体的な任務操作を行い、この時スレッドの総数は変わらない.3、第3段階は末端スレッドから始まり、下から上へ段階的にタスク結果をまとめる.この時、スレッド総数は次第に少なくなり、最後にメインスレッドは要約完了の最終結果を受け取る.以上の第1段階は、概括的に「分けて治める」と呼ばれている.第3段階については、要約して「集約帰一」と呼ぶことができる.このような別々のビジネスニーズを実現するために、Java 7はFork/Joinフレームワークを追加し、対症的に薬を投与した.このフレームワークのFork操作は、ツリー構造に従って下位スレッドを絶えず分離し、それに対応するのは分割されたプロセスである.Join操作では,リーフスレッドの演算結果を逐次集計し,それに対応するプロセスを集約する.この分割の過程で,Fork/Joinフレームワーク専用のスレッドプールツールForkJoinPoolがひっそりと浮かび上がり,ExecutorServiceから派生したサブクラスである.分割ポリシーの特殊な性質に鑑みて、Fork/Joinフレームワークは汎用的なRunnableタスクを使用しないで、専門的な再帰タスクRecursiveTaskを使用するように変更され、このタスクのforkメソッドは分割されたForkオペレーションを実現し、joinメソッドは集約されたJoinオペレーションを実現した.簡単な適用例を挙げると、0から99の間のすべての整数の合計など、連続した数列の合計に対して、通常はループ文を書いて順次加算します.従来の書き方は明らかに1つのメインスレッドだけが加算演算を実行しており、マルチコアCPUの性能優位性を体現できないため、求和操作を分けて治すことを試みることができ、まずセグメント数列全体をいくつかのサブ数列に分け、各サブ数列をそれぞれ和を求め、最後にすべてのサブ数列の和をまとめることができる.RecursiveTaskを用いてこのような割り当て和タスクを実現する場合、再帰タスクのエントリがrunメソッドからcomputeメソッドに変更されたことに注意するコード例を参照してください.
//           
public class SumTask extends RecursiveTask {
	private static final long serialVersionUID = 1L;
	private static final int THRESHOLD = 20; //             
	private int src[]; //         
	private int start; //          
	private int end; //          

	public SumTask(int[] src, int start, int end) {
		this.src = src;
		this.start = start;
		this.end = end;
	}

	//             
	private Integer subTotal() {
		Integer sum = 0;
		for (int i = start; i < end; i++) { //              
			sum += src[i];
		}
		//       ,         、    、    、    
		String desc = String.format("%s ∑(%d~%d)=%d", Thread.currentThread().getName(), start, end, sum);
		System.out.println(desc);
		return sum;
	}

	@Override
	protected Integer compute() {
		if ((end - start) <= THRESHOLD) { //       
			return subTotal(); //             
		} else { //     ,      
			int middle = (start + end) / 2; //          
			//            
			SumTask left = new SumTask(src, start, middle);
			left.fork(); //                
			//            
			SumTask right = new SumTask(src, middle, end);
			right.fork(); //                
			//                       ,           
			int sum = left.join() + right.join();
			//       ,         、    、    、    
			String desc = String.format("%s ∑(%d~%d)=%d", Thread.currentThread().getName(), start, end, sum);
			System.out.println(desc);
			return sum; //            
		}
	}
}

次に、外部から上の和を求めるタスクに和を求める整数配列を入力し、タスクオブジェクトのinvokeを呼び出して実行結果を取得すると、コマンドに組み込まれたスレッドプールで和を求めるタスクを開始できます.呼び出しコードの例は次のとおりです.
//             
private static void testInternalTask() {
	//       0 99     
	int[] arr = new int[100];
	for (int i = 0; i < 100; i++) {
		arr[i] = i + 1;
	}
	//            
	SumTask task = new SumTask(arr, 0, arr.length);
	try {
		//       ,       。   invoke        ForkJoinPool
		Integer result = task.invoke();
		System.out.println("      : " + result);
	} catch (Exception e) {
		e.printStackTrace();
	}
}

以上の呼び出しコードを実行し、次のスレッドプールログを出力します.
ForkJoinPool.commonPool-worker-3: ∑(0~12)=78
ForkJoinPool.commonPool-worker-0: ∑(75~87)=978
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678
ForkJoinPool.commonPool-worker-0: ∑(87~100)=1222
ForkJoinPool.commonPool-worker-3: ∑(12~25)=247
ForkJoinPool.commonPool-worker-3: ∑(0~25)=325
ForkJoinPool.commonPool-worker-0: ∑(75~100)=2200
ForkJoinPool.commonPool-worker-2: ∑(62~75)=897
ForkJoinPool.commonPool-worker-2: ∑(50~75)=1575
ForkJoinPool.commonPool-worker-1: ∑(37~50)=572
ForkJoinPool.commonPool-worker-3: ∑(25~37)=378
ForkJoinPool.commonPool-worker-3: ∑(25~50)=950
ForkJoinPool.commonPool-worker-1: ∑(0~50)=1275
ForkJoinPool.commonPool-worker-2: ∑(50~100)=3775
main: ∑(0~100)=5050
      : 5050

ログから、Fork/Joinフレームワークのデフォルトスレッドプールでは、4つのスレッド(デバイスのCPU数)が起動し、最後の統計作業はプライマリ・スレッドで完了します.
前述の呼び出しコードには、Fork/JoinフレームワークのスレッドプールツールForkJoinPoolが明記されていないことに注意してください.これは、再帰タスクにはデフォルトの内蔵スレッドプールがあり、外部がスレッドプールオブジェクトを指定しなくても、再帰タスクは内蔵スレッドプールを使用してスレッドスケジューリングを行うためです.ただし、デフォルトのスレッドプールではパーソナライズされたパラメータを設定できないため、コードにForkJoinPoolスレッドプールを明示的に指定し、スレッドプールオブジェクトのexecute/invoke/submitの3つのメソッドの1つを呼び出して再帰タスクを開始することをお勧めします.この3つのメソッドの具体的な用途については、execute:指定したタスクを非同期で実行し、戻り値がありません.invoke:指定したタスクを同期して実行し、戻り値を待機します.戻り値は最終的な演算結果です.submit:指定したタスクを非同期で実行し、結果タスクオブジェクトを返します.その後、結果タスクを呼び出すgetメソッドを選択して最終的な演算結果を取得できます.次に、外部呼び出し時にスレッドプールを明示的に指定する和コードの例を示します.
//             
private static void testPoolTask() {
	//       0 99     
	int[] arr = new int[100];
	for (int i = 0; i < 100; i++) {
		arr[i] = i + 1;
	}
	//            
	SumTask task = new SumTask(arr, 0, arr.length);
	//               ,     6
	ForkJoinPool pool = new ForkJoinPool(6);
	//            ,              
	ForkJoinTask taskResult = pool.submit(task);
	try {
		Integer result = taskResult.get(); //       ,          
		System.out.println("      : " + result);
	} catch (Exception e) {
		e.printStackTrace();
	}
	pool.shutdown(); //      
}

変更されたコールコードを実行し、次のスレッドプールログを出力します.
ForkJoinPool-1-worker-1: ∑(0~12)=78
ForkJoinPool-1-worker-3: ∑(62~75)=897
ForkJoinPool-1-worker-5: ∑(12~25)=247
ForkJoinPool-1-worker-5: ∑(87~100)=1222
ForkJoinPool-1-worker-5: ∑(25~37)=378
ForkJoinPool-1-worker-5: ∑(37~50)=572
ForkJoinPool-1-worker-5: ∑(25~50)=950
ForkJoinPool-1-worker-1: ∑(0~25)=325
ForkJoinPool-1-worker-4: ∑(50~62)=678
ForkJoinPool-1-worker-4: ∑(50~75)=1575
ForkJoinPool-1-worker-6: ∑(75~87)=978
ForkJoinPool-1-worker-6: ∑(75~100)=2200
ForkJoinPool-1-worker-2: ∑(0~50)=1275
ForkJoinPool-1-worker-3: ∑(50~100)=3775
ForkJoinPool-1-worker-1: ∑(0~100)=5050
      : 5050

ログにより、このときのスレッドプールの実行状況は、さっきとは2つの点が異なります.オープンするとスレッドの数が多くなります.これは、新しいスレッドプールオブジェクトが同時数6を設定しているためです.2つの最後のステップの統計作業は、オンライン・スレッド・プール内で実行されるため、プライマリ・スレッドの負担が軽減されます.結論はもちろん外部から明示的にForkJoinPoolを指定する方法がより優れている.
Javaテクノロジーの詳細については、「Java開発ノート(シーケンス)章ディレクトリ」を参照してください.