1 Gメモリのみで、10 Gのファイル内のデータをソートする方法


考える
     1.集計、外排2.マルチスレッドRecursiveTask
生成ファイルコードのテスト
/***
 *@author dongsheng
 *@date 2019/1/18 22:58
 *@Description:
 *@version 1.0.0
 */
public class GenerateNumber {

	private static final String filePath="F:/file_test/data1.txt";

	public static void main(String[] args) throws IOException {
		//      
		Random random = new Random();
		try (PrintWriter out = new PrintWriter(new File(filePath))) {
			long beginTime = System.currentTimeMillis();
			System.out.println("beginTime:"+beginTime);
			for (int i = 0; i < 1_000_000; i++) {
				out.println(random.nextInt());
				if (i % 10000 == 0)
					out.flush();
			}
			System.out.println("endTime:"+(System.currentTimeMillis()-beginTime));
		}
	}
}

ソート・コード
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;


/***
 *@author dongsheng
 *@date 2019/1/18 22:58
 *@Description:
 *@version 1.0.0
 */
public class SortByForkjoin {

	private static final String filePath="F:/file_test/data1.txt";
	private static final String afterFilePath="F:/file_test/data1-sort.txt";

	public static void main(String[] args) throws Exception {
		ForkJoinPool pool = new ForkJoinPool();

		/***
		 * 1.                  ,            
		 * 2. 100_000        
		 * */
		int size = 100_000;
		int[] array = new int[size];
		BufferedReader reader = new BufferedReader(new FileReader(filePath));
		String line = null;
		int i = 0;
		int partition = 0;

		/***
		 * 1.        list,             
		 * 2. size   ,   ,      
		 * */
		List partFiles = new ArrayList<>();
		while ((line = reader.readLine()) != null) {
			array[i++] = Integer.parseInt(line);
			if (i == size) {
				//   i 0
				i = 0;
				//          
				String filename = "F:/file_test/data1-part-" + partition++ + ".txt";
				//   partition       
				doPartitionSort(pool, filename, array, 0, size);
				partFiles.add(filename);
			}
		}
		reader.close();

		//    size     ,        
		if (i > 0) {
			//          
			String filename = "F:/file_test/data1-part-" + partition++ + ".txt";
			doPartitionSort(pool, filename, array, 0, i);
			partFiles.add(filename);
		}

		if (partition > 1) {
			//     
			MergerFileSortTask mtask = new MergerFileSortTask(partFiles, afterFilePath);
			pool.submit(mtask);
			mtask.get();

		} else {
			//                      
		}
		pool.shutdown();
	}

	/**
	 * partition       
	 * @param pool     
	 * @param filename     
	 * @param array    
	 * @param start      
	 * @param end       
	 * @throws Exception
	 */
	static void doPartitionSort(ForkJoinPool pool, String filename, int[] array, int start, int end) throws Exception {
		ArrayMergerSortTask task = new ArrayMergerSortTask(array, start, end);
		pool.submit(task);
		task.get();
		try (PrintWriter pw = new PrintWriter(filename);) {
			for (int i = start; i < end; i++) {
				pw.println(array[i]);
			}
		}
	}
}



/***
 *@author dongsheng
 *@date 2019/1/18 22:58
 *@Description:
 *@version 1.0.0
 */
public class ArrayMergerSortTask extends RecursiveAction {

	// implementation details follow:
	static final int THRESHOLD = 1000;

	final int[] array;
	final int lo, hi;

	ArrayMergerSortTask(int[] array, int lo, int hi) {
		this.array = array;
		this.lo = lo;
		this.hi = hi;
	}

	ArrayMergerSortTask(int[] array) {
		this(array, 0, array.length);
	}

	protected void compute() {
		if (hi - lo < THRESHOLD)		//  1000,   
			sortSequentially(lo, hi);
		else {
			int mid = (lo + hi) >>> 1;		//  1000,  
			invokeAll(new ArrayMergerSortTask(array, lo, mid),
					new ArrayMergerSortTask(array, mid, hi));
			merge(lo, mid, hi);
		}
	}

	void sortSequentially(int lo, int hi) {
		Arrays.sort(array, lo, hi);		//  JDK       
	}

	void merge(int lo, int mid, int hi) {
		int[] buf = Arrays.copyOfRange(array, lo, mid);
		for (int i = 0, j = lo, k = mid; i < buf.length; j++)
			array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
	}

	
}


/***
 *@author dongsheng
 *@date 2019/1/18 22:58
 *@Description:
 *@version 1.0.0
 */
public class MergerFileSortTask extends RecursiveTask {

	List partFiles;
	int lo, hi;

	String filename;

	public MergerFileSortTask(List partFiles, int lo, int hi, String filename) {
		super();
		this.partFiles = partFiles;
		this.lo = lo;
		this.hi = hi;
		this.filename = filename;
	}

	public MergerFileSortTask(List partFiles, String filename) {
		this(partFiles, 0, partFiles.size(), filename);
	}

	@Override
	protected String compute() {
		//            2, fork
		int fileCount = hi - lo;
		if (fileCount > 2) {		//fileCount>2      
			int mid = fileCount / 2;
			MergerFileSortTask task1 = new MergerFileSortTask(partFiles, lo, lo + mid, this.filename + "-1");
			MergerFileSortTask task2 = new MergerFileSortTask(partFiles, lo + mid, hi, this.filename + "-2");

			//     forkjoinPool    
			task1.fork();
			task2.fork();

			//       
			try {		//  
				this.mergerFile(task1.get(), task2.get());	//      
			} catch (IOException | InterruptedException | ExecutionException e) {

				e.printStackTrace();
			}
		} else if (fileCount == 2) {		//     2,    
			//       
			try {
				this.mergerFile(this.partFiles.get(lo), this.partFiles.get(lo + 1));
			} catch (IOException e) {

				e.printStackTrace();
			}
		} else if (fileCount == 1) {
			return this.partFiles.get(lo);
		}
		return this.filename;
	}

	private void mergerFile(String f1, String f2) throws IOException {
		try (BufferedReader reader1 = new BufferedReader(new FileReader(f1));
				BufferedReader reader2 = new BufferedReader(new FileReader(f2));
				PrintWriter pw = new PrintWriter(filename);) {

			String s1 = reader1.readLine(), s2 = reader2.readLine();
			Integer d1 = s1 == null ? null : Integer.valueOf(s1);
			Integer d2 = s2 == null ? null : Integer.valueOf(s2);
			while (true) {
				if (s1 == null) {		//s1 null,   2  
					//     reader2, 
					while (s2 != null) {
						pw.println(s2);
						s2 = reader2.readLine();
					}
				} else if (s2 == null) {  //s2 null,   1  
					//     reader1, 
					while (s1 != null) {
						pw.println(s1);
						s1 = reader1.readLine();
					}
				} else {

					//      
					while (d1 <= d2 && s1 != null) {
						//  s1,    reader1
						pw.println(s1);
						s1 = reader1.readLine();
						if (s1 != null) {
							d1 = Integer.valueOf(s1);
						}
					}

					while (d1 > d2 && s2 != null) {
						//  s2,    reader2
						pw.println(s2);
						s2 = reader2.readLine();
						if (s2 != null) {
							d2 = Integer.valueOf(s2);
						}
					}

				}

				if (s1 == null && s2 == null) //     
					break;
			}
		}
	}
}
RecursiveTask
RecusiveTaskRecusiveActionは似ているが、各サブタスクの処理後に戻り値が付き、最終的にすべてのサブタスクの戻り結果がjoin(マージ)して結果となる.
ソースコード
/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

/**
 * A recursive result-bearing {@link ForkJoinTask}.
 *
 * 

For a classic example, here is a task computing Fibonacci numbers: * *

 {@code
 * class Fibonacci extends RecursiveTask {
 *   final int n;
 *   Fibonacci(int n) { this.n = n; }
 *   Integer compute() {
 *     if (n <= 1)
 *       return n;
 *     Fibonacci f1 = new Fibonacci(n - 1);
 *     f1.fork();
 *     Fibonacci f2 = new Fibonacci(n - 2);
 *     return f2.compute() + f1.join();
 *   }
 * }}
*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask extends ForkJoinTask {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}