シリーズの同時プログラミングをマスターする-10.Fork/Joinフレームワーク
4954 ワード
高同時性、高可用性アーキテクチャの把握
第二課同時プログラミング
この授業から同時プログラミングの内容を学びます.主に、同時プログラミングの基礎知識、ロック、メモリモデル、スレッドプール、各種同時コンテナの使用について説明します.
第十節Fork/Joinフレームワーク
基本思想.
きほんげんり
さぎょう盗難アルゴリズム
アルゴリズムの考え方:各スレッドには独自のWorkQueueがあり、両端キュー です.キューは3つの機能をサポートし、push、pop、poll push/popはキューの所有者スレッドでのみ使用でき、pollは他のスレッドで を呼び出すことができる.で区切られたサブタスクがforkを呼び出すと、スレッドのキューに がpushされます.デフォルトでは、スレッドは自分のキューからタスクを取得し、 を実行する.自分のキューが空である場合、poll盗みタスク をランダムに別のスレッドのキューの末尾から呼び出す.
ForkJoinPoolオブジェクトの作成呼び出し ForkJoinPool内部メソッドcommonPool() を呼び出す呼び出しコンストラクタ
ほとんどの場合、私たちは
以下はForkJoinTaskの3つのコアメソッドです. fork()は、大きなタスクが小さなタスクに分割された後、小さなタスクを呼び出すfork()メソッドは、スレッドプールにタスクを入れることができる である. join()は、小さなタスクのjoin()メソッドを呼び出してタスクの戻り結果を待つ.サブタスクが異常を投げ出すと、joinも異常を投げ出す.メソッド を呼び出す必要がある. invoke()は、現在のスレッドでタスク を同期実行する.
第二課同時プログラミング
この授業から同時プログラミングの内容を学びます.主に、同時プログラミングの基礎知識、ロック、メモリモデル、スレッドプール、各種同時コンテナの使用について説明します.
第十節Fork/Joinフレームワーク
Fork/Join
ForkJoinPool
基本思想.
ThreadPoolExecutor
スレッドプール内の各タスクは、単一のスレッドによって独立して処理されます.非常に時間のかかるタスクが発生すると、スレッドプールに1つのスレッドだけがこの大きなタスクを処理していますが、他のスレッドは空いています.これはCPU負荷のバランシングを引き起こす.ForkJoinPool
で、1つの大きなタスクを複数の小さなタスクに分割し、forkを使用して他のスレッドに小さなタスクを配布して同時に処理し、joinを使用して小さなタスクの実行結果をまとめます.マルチプロセッサの利点を利用して、使用可能なすべての処理能力を集中して実行効率を向上させることが、分而治の考え方の並列実現です.きほんげんり
ForkJoinPool
もExecutorService
インタフェースの実装クラスである.ForkJoinPool
の2つのコアは、ワーク・スティリンク・アルゴリズムとを区別して処理することです.さぎょう盗難アルゴリズム
アルゴリズムの考え方:
ForkJoinPoolオブジェクトの作成
Executors
ツールクラス// parallelism
public static ExecutorService newWorkStealingPool(int parallelism);
// Runtime.getRuntime().availableProcessors()
public static ExecutorService newWorkStealingPool();
public static ForkJoinPool commonPool();
ForkJoinTask
ほとんどの場合、私たちは
ForkJoinTask
からFormJoinPool
に提出しています.以下はForkJoinTaskの3つのコアメソッドです.
quietlyJoin()
は、異常を投げ出すも結果を返さず、getException()
およびgetResult()
RecursiveAction
およびRecursiveTask
通常、ForkJoinTaskを直接使用するのではなく、2つの抽象クラスを使用します.RecursiveAction
:戻り値のないタスクRecursiveTask
:戻り値のあるタスクpublic class RecursiveActionTeset {
static class Sorter extends RecursiveAction {
public static void sort(long[] array) {
ForkJoinPool.commonPool().invoke(new Sorter(array, 0, array.length));
}
private final long[] array;
private final int lo, hi;
public Sorter(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private static final int THRESHOLD = 1000;
//
@Override
protected void compute() {
if (hi - lo < 1000) {
Arrays.sort(array, lo, hi);
} else {
int mid = (hi + lo) >>> 1;
// 1000 ,
Sorter left = new Sorter(array, lo, mid);
Sorter right = new Sorter(array, mid, hi);
invokeAll(left, right);
merge(lo, mid, hi);
}
}
private void merge(int lo, int mid, int hi) {
long[] buff = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buff.length; i++) {
if (k == hi || buff[i] < array[k]) {
array[j] = buff[i++];
} else {
array[j] = array[k++];
}
}
}
public static void main(String[] args) {
long[] array = new Random().longs(100_0000).toArray();
Sorter.sort(array);
System.out.println(Arrays.toString(array));
}
}
}
public class BatchInsertTask extends RecursiveTask {
//
List records;
public BatchInsertTask(List records) {
this.records = records;
}
@Override
protected Integer compute() {
// 5,
if (records.size() < 5) {
return computeDirectly();
} else {
// 5,
int size = records.size();
//
BatchInsertTask aTask = new BatchInsertTask(records.subList(0, size / 2));
//
BatchInsertTask bTask = new BatchInsertTask(records.subList(size / 2, records.size()));
//
invokeAll(aTask, bTask);
//
return aTask.join() + bTask.join();
}
}
/**
*
*/
private int computeDirectly() {
try {
Thread.sleep((long) (records.size() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" :" + Arrays.toString(records.toArray()));
return records.size();
}
}