1 Gメモリのみで、10 Gのファイル内のデータをソートする方法
9072 ワード
考える
1.集計、外排2.マルチスレッドRecursiveTask
生成ファイルコードのテスト
ソート・コード
ソースコード
1.集計、外排2.マルチスレッドRecursiveTask
生成ファイルコードのテスト
/***
*@author dongsheng
*@date 2019/1/18 22:58
*@Description:
*@version 1.0.0
*/
public class GenerateNumber {
private static final String filePath="F:/file_test/data1.txt";
public static void main(String[] args) throws IOException {
//
Random random = new Random();
try (PrintWriter out = new PrintWriter(new File(filePath))) {
long beginTime = System.currentTimeMillis();
System.out.println("beginTime:"+beginTime);
for (int i = 0; i < 1_000_000; i++) {
out.println(random.nextInt());
if (i % 10000 == 0)
out.flush();
}
System.out.println("endTime:"+(System.currentTimeMillis()-beginTime));
}
}
}
ソート・コード
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
/***
*@author dongsheng
*@date 2019/1/18 22:58
*@Description:
*@version 1.0.0
*/
public class SortByForkjoin {
private static final String filePath="F:/file_test/data1.txt";
private static final String afterFilePath="F:/file_test/data1-sort.txt";
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();
/***
* 1. ,
* 2. 100_000
* */
int size = 100_000;
int[] array = new int[size];
BufferedReader reader = new BufferedReader(new FileReader(filePath));
String line = null;
int i = 0;
int partition = 0;
/***
* 1. list,
* 2. size , ,
* */
List partFiles = new ArrayList<>();
while ((line = reader.readLine()) != null) {
array[i++] = Integer.parseInt(line);
if (i == size) {
// i 0
i = 0;
//
String filename = "F:/file_test/data1-part-" + partition++ + ".txt";
// partition
doPartitionSort(pool, filename, array, 0, size);
partFiles.add(filename);
}
}
reader.close();
// size ,
if (i > 0) {
//
String filename = "F:/file_test/data1-part-" + partition++ + ".txt";
doPartitionSort(pool, filename, array, 0, i);
partFiles.add(filename);
}
if (partition > 1) {
//
MergerFileSortTask mtask = new MergerFileSortTask(partFiles, afterFilePath);
pool.submit(mtask);
mtask.get();
} else {
//
}
pool.shutdown();
}
/**
* partition
* @param pool
* @param filename
* @param array
* @param start
* @param end
* @throws Exception
*/
static void doPartitionSort(ForkJoinPool pool, String filename, int[] array, int start, int end) throws Exception {
ArrayMergerSortTask task = new ArrayMergerSortTask(array, start, end);
pool.submit(task);
task.get();
try (PrintWriter pw = new PrintWriter(filename);) {
for (int i = start; i < end; i++) {
pw.println(array[i]);
}
}
}
}
/***
*@author dongsheng
*@date 2019/1/18 22:58
*@Description:
*@version 1.0.0
*/
public class ArrayMergerSortTask extends RecursiveAction {
// implementation details follow:
static final int THRESHOLD = 1000;
final int[] array;
final int lo, hi;
ArrayMergerSortTask(int[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
ArrayMergerSortTask(int[] array) {
this(array, 0, array.length);
}
protected void compute() {
if (hi - lo < THRESHOLD) // 1000,
sortSequentially(lo, hi);
else {
int mid = (lo + hi) >>> 1; // 1000,
invokeAll(new ArrayMergerSortTask(array, lo, mid),
new ArrayMergerSortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi); // JDK
}
void merge(int lo, int mid, int hi) {
int[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++)
array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
}
}
/***
*@author dongsheng
*@date 2019/1/18 22:58
*@Description:
*@version 1.0.0
*/
public class MergerFileSortTask extends RecursiveTask {
List partFiles;
int lo, hi;
String filename;
public MergerFileSortTask(List partFiles, int lo, int hi, String filename) {
super();
this.partFiles = partFiles;
this.lo = lo;
this.hi = hi;
this.filename = filename;
}
public MergerFileSortTask(List partFiles, String filename) {
this(partFiles, 0, partFiles.size(), filename);
}
@Override
protected String compute() {
// 2, fork
int fileCount = hi - lo;
if (fileCount > 2) { //fileCount>2
int mid = fileCount / 2;
MergerFileSortTask task1 = new MergerFileSortTask(partFiles, lo, lo + mid, this.filename + "-1");
MergerFileSortTask task2 = new MergerFileSortTask(partFiles, lo + mid, hi, this.filename + "-2");
// forkjoinPool
task1.fork();
task2.fork();
//
try { //
this.mergerFile(task1.get(), task2.get()); //
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
} else if (fileCount == 2) { // 2,
//
try {
this.mergerFile(this.partFiles.get(lo), this.partFiles.get(lo + 1));
} catch (IOException e) {
e.printStackTrace();
}
} else if (fileCount == 1) {
return this.partFiles.get(lo);
}
return this.filename;
}
private void mergerFile(String f1, String f2) throws IOException {
try (BufferedReader reader1 = new BufferedReader(new FileReader(f1));
BufferedReader reader2 = new BufferedReader(new FileReader(f2));
PrintWriter pw = new PrintWriter(filename);) {
String s1 = reader1.readLine(), s2 = reader2.readLine();
Integer d1 = s1 == null ? null : Integer.valueOf(s1);
Integer d2 = s2 == null ? null : Integer.valueOf(s2);
while (true) {
if (s1 == null) { //s1 null, 2
// reader2,
while (s2 != null) {
pw.println(s2);
s2 = reader2.readLine();
}
} else if (s2 == null) { //s2 null, 1
// reader1,
while (s1 != null) {
pw.println(s1);
s1 = reader1.readLine();
}
} else {
//
while (d1 <= d2 && s1 != null) {
// s1, reader1
pw.println(s1);
s1 = reader1.readLine();
if (s1 != null) {
d1 = Integer.valueOf(s1);
}
}
while (d1 > d2 && s2 != null) {
// s2, reader2
pw.println(s2);
s2 = reader2.readLine();
if (s2 != null) {
d2 = Integer.valueOf(s2);
}
}
}
if (s1 == null && s2 == null) //
break;
}
}
}
}
RecursiveTask
RecusiveTask
とRecusiveAction
は似ているが、各サブタスクの処理後に戻り値が付き、最終的にすべてのサブタスクの戻り結果がjoin(マージ)して結果となる.ソースコード
/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
/**
* A recursive result-bearing {@link ForkJoinTask}.
*
* For a classic example, here is a task computing Fibonacci numbers:
*
*
{@code
* class Fibonacci extends RecursiveTask {
* final int n;
* Fibonacci(int n) { this.n = n; }
* Integer compute() {
* if (n <= 1)
* return n;
* Fibonacci f1 = new Fibonacci(n - 1);
* f1.fork();
* Fibonacci f2 = new Fibonacci(n - 2);
* return f2.compute() + f1.join();
* }
* }}
*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask extends ForkJoinTask {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}