Java同時プログラミング-25-タスクのマージ結果


一、連結タスクの結果
Fork/Joinフレームワークは、RecursiveTaskクラスによって実装されたタスクを実行し、結果を返す能力を提供します.
RecursiveTaskクラスはForkJoinTaskクラスを継承し、アクチュエータフレームワークによって提供されるFutureインタフェースを実現しています.
二、get()方法
get()メソッドはFutureインタフェースで提供され,RecursiveTaskクラスはこのインタフェースを実現し,このメソッドを直接呼び出し,Taskタスクの戻り結果を返すことができる.
三、シミュレーション
≪ドキュメント・タスク|Document Tasks|emdw≫:ドキュメント内の各行を巡回してこの語を検索します.
行タスク:指定された行でこの語を検索します.
文書タスクを行タスクに分割
package com.currency.forkandjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

/**
 *                        
 * 
 * @author Nicholas
 *
 */
public class DocumentTask extends RecursiveTask {

	private static final long serialVersionUID = 1L;

	private String[][] document;
	private int start, end;
	private String word;

	public DocumentTask(String[][] document, int start, int end, String word) {
		this.document = document;
		this.start = start;
		this.end = end;
		this.word = word;
	}

	@Override
	protected Integer compute() {
		int result = 0;
		if (end - start < 10)
			result = processLines(document, start, end, word);
		else {
			int mid = (end + start) / 2;
			DocumentTask documentTask1 = new DocumentTask(document, start, mid,
					word);
			DocumentTask documentTask2 = new DocumentTask(document, mid, end,
					word);
			invokeAll(documentTask1, documentTask2);
			try {
				result = groupResults(documentTask1.get(), documentTask2.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		return new Integer(result);
	}

	public Integer processLines(String[][] document, int start, int end,
			String word) {
		List tasks = new ArrayList();

		for (int i = start; i < end; i++) {
			LineTask lineTask = new LineTask(document[i], 0,
					document[i].length, word);
			tasks.add(lineTask);
		}
		invokeAll(tasks);
		int result = 0;
		for (int i = 0; i < tasks.size(); i++) {
			LineTask lineTask = tasks.get(i);

			try {
				result = result + lineTask.get();
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}

		return new Integer(result);
	}

	public Integer groupResults(Integer number1, Integer number2) {
		return (number1 + number2);
	}
}
package com.currency.forkandjoin;

import java.util.Random;

/**
 *               
 * 
 * @author Nicholas
 *
 */
public class DocumentMock {
	private String[] words = { "the", "hello", "world", "goodbye", "class",
			"main", "java", "c++", "python" };

	public String[][] generateDocument(int mumberLines, int numberWords,
			String word) {
		int counter = 0;
		String[][] documens = new String[mumberLines][numberWords];
		Random random = new Random();
		for (int i = 0; i < mumberLines; i++) {
			for (int j = 0; j < numberWords; j++) {
				int index = random.nextInt(words.length);
				documens[i][j] = words[index];
				if (documens[i][j].equals(word))
					counter++;
			}
		}

		//                 
		System.out.println("DocumentMock : The word appears " + counter
				+ " times in the documents");

		return documens;
	}
}
package com.currency.forkandjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

/**
 *               
 * @author Nicholas
 *
 */
public class LineTask extends RecursiveTask {
	
	private static final long serialVersionUID = 1L;

	private String[] line;
	private int start, end;
	private String word;

	public LineTask(String[] line, int start, int end, String word) {
		this.line = line;
		this.start = start;
		this.end = end;
		this.word = word;
	}

	@Override
	protected Integer compute() {
		Integer result = null;
		if (end - start < 100)
			result = Count(line, start, end, word);
		else {
			int mid = (end + start) / 2;
			LineTask lineTask1 = new LineTask(line, start, mid, word);
			LineTask lineTask2 = new LineTask(line, mid, end, word);
			invokeAll(lineTask1, lineTask2);

			try {
				result = groupResults(lineTask1.get(), lineTask2.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public Integer Count(String[] line, int start, int end, String word) {
		int counter = 0;
		for (int i = start; i < end; i++) {
			if (line[i].equals(word))
				counter++;
		}

		try {
			Thread.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		return counter;
	}

	public Integer groupResults(Integer number1, Integer number2) {
		return (number1 + number2);
	}
}
package com.currency.forkandjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {
		DocumentMock documentMock = new DocumentMock();
		String[][] document = documentMock.generateDocument(100, 1000, "the");

		DocumentTask documentTask = new DocumentTask(document, 0, 100, "the");

		ForkJoinPool forkJoinPool = new ForkJoinPool();
		forkJoinPool.execute(documentTask);

		do {
			System.out
					.println("****************************************************");
			System.out.println("Main : Parallelism : "
					+ forkJoinPool.getParallelism());
			System.out.println("Main : Active Threads : "
					+ forkJoinPool.getActiveThreadCount());
			System.out.println("Main : Task Count : "
					+ forkJoinPool.getQueuedTaskCount());
			System.out.println("Main : Steal Count : "
					+ forkJoinPool.getStealCount());
			System.out
					.println("****************************************************");
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} while (!documentTask.isDone());

		forkJoinPool.shutdown();

		try {
			forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		try {
			System.out.println("Main : The word appears " + documentTask.get()
					+ " in the documents");

		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}
}

結果:
DocumentMock : The word appears 10997 times in the documents
****************************************************
Main : Parallelism : 4
Main : Active Threads : 1
Main : Task Count : 0
Main : Steal Count : 0
****************************************************
****************************************************
Main : Parallelism : 4
Main : Active Threads : 4
Main : Task Count : 32
Main : Steal Count : 0
****************************************************
****************************************************
Main : Parallelism : 4
Main : Active Threads : 4
Main : Task Count : 16
Main : Steal Count : 0
****************************************************
****************************************************
Main : Parallelism : 4
Main : Active Threads : 4
Main : Task Count : 19
Main : Steal Count : 2
****************************************************
****************************************************
Main : Parallelism : 4
Main : Active Threads : 2
Main : Task Count : 11
Main : Steal Count : 6
****************************************************
Main : The word appears 10997 in the documents