[JAVA]フォーク/連結フレーム
フォークフレーム
fork/結合フレームワークは、並列化可能なタスクをより小さなワークスペースに再帰的に分割し、各サブタスクの結果を統合して完全な結果を生成するように設計されている.
RecursiveTask
スレッドプールを使用するには、RecursiveTask\<R>
のサブクラスを作成する必要があります.
Rは、並列化された技術によって生成された結果形式または結果がない場合のRecursiveAction
形式である.RecursiveTask
を定義するためには、compute
を実装する必要がある.
compute
compute
の方法は、技術をサブ技術に分割する論理と、再分割できない場合に単一のサブ技術結果を生成するアルゴリズムを定義する.
ex)if (테스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 테스크 계산
} else {
테스크를 두 서브테스크로 분할
테스크가 다시 서브테스크로 분할되도록 이 메서드 재귀 호출
모든 서브테스크의 연산이 완료될 때까지 기다림
각 서브테스크의 결과 합침
}
例
範囲の数値を加算する例を見てみましょう.
ForkJoinSumCalculatorimport java.util.concurrent.RecursiveTask;
// RecursiveTask를 상속받아 포크조인 프레임워크에서 사용할 테스크 생성
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; // 더할 숫자 배열
private final int start; // 이 서브테스크에서 처리할 배열의 초기 위치와 최종 위치
private final int end;
public static final long THRESHOLD = 10_000; // 이 값 이하의 서브테스크는 더 이상 분할 x
// 메인 테스크를 생성할 때 사용할 공개 생성자
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 메인 테스트의 서브테스크를 재귀적으로 만들 때 사용할 private 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start; // 이 테스크에서 더할 배열의 길이
if (length <= THRESHOLD) {
// 기준값과 같거나 작으면 순차적으로 결과 계산
return computeSequentially();
}
// 배열의 첫 번째 절반을 더하도록 서브테스크 생성
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start+length/2);
// ForkJoinPool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행
leftTask.fork();
// 배열의 나머지 절반 더하는 서브테스크 생성
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start, start+length/2);
// 두번째 서브테스크 동기 실행. 추가 분할 발생할 수 있음
Long rightResult = rightTask.compute();
// 첫번째 서브테스크 결과를 읽거나 아직 결과가 없으면 기다림
Long leftResult = leftTask.join();
// 두 서브테스크의 결과를 조합한 값이 이 테스크의 결과값
return rightResult + leftResult;
}
// 더 분할할 수 없을 때 서브테스크의 결과를 계산하는 알고리즘
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
nへの自然数加算を並列に実行する方法を直感的に示した.次の図に示すように、必要な数の配列をユーザーに渡すことができます.long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
new ForkJoinPool().invoke(task);
通常、アプリケーションは2つ以上のForkJoinPool
を使用しません.
すなわち、一度だけインスタンス化し、静的フィールドに格納します.
ForkJoinSumCalculatorの実行
ForkJoinSumCalculator
がForkJoinPool
に転送されると、プールスレッドは、ForkJoinSumCalculator
のcompute
を実行して動作を実行する.
しかし、十分に理解していないまま使用すると、かえって性能が低下してしまう.
たとえば、この方法を使用すると、ストリーム全体がlong[]に変換されるため、パラレルストリームを使用するよりもパフォーマンスが劣る場合があります.// 스트림을 long[]로 변환하는 과정에서 성능 저하가 발생
long[] numbers = LongStream.rangeClosed(1, n).toArray();
フォークリフト/ジョイントフレーム使用上の注意事項
if (테스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 테스크 계산
} else {
테스크를 두 서브테스크로 분할
테스크가 다시 서브테스크로 분할되도록 이 메서드 재귀 호출
모든 서브테스크의 연산이 완료될 때까지 기다림
각 서브테스크의 결과 합침
}
import java.util.concurrent.RecursiveTask;
// RecursiveTask를 상속받아 포크조인 프레임워크에서 사용할 테스크 생성
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; // 더할 숫자 배열
private final int start; // 이 서브테스크에서 처리할 배열의 초기 위치와 최종 위치
private final int end;
public static final long THRESHOLD = 10_000; // 이 값 이하의 서브테스크는 더 이상 분할 x
// 메인 테스크를 생성할 때 사용할 공개 생성자
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 메인 테스트의 서브테스크를 재귀적으로 만들 때 사용할 private 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start; // 이 테스크에서 더할 배열의 길이
if (length <= THRESHOLD) {
// 기준값과 같거나 작으면 순차적으로 결과 계산
return computeSequentially();
}
// 배열의 첫 번째 절반을 더하도록 서브테스크 생성
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start+length/2);
// ForkJoinPool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행
leftTask.fork();
// 배열의 나머지 절반 더하는 서브테스크 생성
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start, start+length/2);
// 두번째 서브테스크 동기 실행. 추가 분할 발생할 수 있음
Long rightResult = rightTask.compute();
// 첫번째 서브테스크 결과를 읽거나 아직 결과가 없으면 기다림
Long leftResult = leftTask.join();
// 두 서브테스크의 결과를 조합한 값이 이 테스크의 결과값
return rightResult + leftResult;
}
// 더 분할할 수 없을 때 서브테스크의 결과를 계산하는 알고리즘
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
new ForkJoinPool().invoke(task);
// 스트림을 long[]로 변환하는 과정에서 성능 저하가 발생
long[] numbers = LongStream.rangeClosed(1, n).toArray();
テクノロジーにjoinメソッドを呼び出すと、テクノロジー生成の結果が準備されるまで呼び出し者がブロックされます.
RecursiveTask
内では、ForkJoinPool
のinvoke
メソッドを使用するべきではありません.compute
またはfork
メソッドは、invoke
は、シーケンスコードで並列計算を開始する場合にのみinvokeを使用します.サブテクノロジーでforkメソッドを呼び出すことで、
ForkJoinPool
のスケジュールを調整することができる.fork
メソッドを呼び出すのは自然なようですが、1つのタスク呼び出しcompute
の方が効果的です.フォーク/結合フレームワークを使用した並列計算はデバッグが困難です
マルチコアでフォークリフト/連結フレームを使用することは、必ずしも順序処理よりも速くはありません.
任務を盗み取る.
カーネルの数にかかわらず、適切なサイズで区切られたテクノロジーに注目する必要があります.
理論的には、ワークロードをカーネル数と同じ並列タスクに分割すると、CPIカーネルはすべてこれらのタスクを実行し、各サイズの同じタスクは同じ時間に終了する可能性がありますが、現実的には、ワーク交換を完了する時間はそれぞれ異なる場合があります.
宿題を盗む方法でこの問題を解決する.
ForkJoinPool
のすべてのスレッドを公平に分割します.各スレッドは、割り当てられたテクノロジーを含む2つの接続リストを参照し、タスクが完了するたびに、キューのヘッダから異なるテクノロジーを取得してタスクを処理します.この場合、1つのスレッドは、別のスレッドよりも早く割り当てられた技術を処理することができる.このとき、タスクを完了したスレッドは、アイドル状態に変換するのではなく、他のスレッドキューの末尾からタスクを盗みます.
したがって、ワークスレッド間のワークロードを同じレベルに維持するには、テクノロジーのサイズを小さく分割する必要があります.
Reference
Reference
この問題について([JAVA]フォーク/連結フレーム), 我々は、より多くの情報をここで見つけました https://velog.io/@cham/JAVA-포크조인-프레임워크テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol