JAVAパラレルフレーム:Fork/Join

5012 ワード

一、背景
現在、プロセッサのコア数は大幅に増加していますが、一般的なアプリケーションでは、タスクの同時処理がそれほど多くないため、タスク別の同時処理ではプロセッサリソースを十分に利用することはできません.このような現状に基づいて,1つのタスクを複数のユニットに分割し,各ユニットがそれぞれ実行され,最後に各ユニットを統合した結果を考える.
Fork/Joinフレームワークは、JAVA 7が提供する並行してタスクを実行するためのフレームワークであり、大きなタスクをいくつかの小さなタスクに分割し、最終的に各小さなタスクの結果をまとめて大きなタスクの結果を得るフレームワークである.
二、作業窃盗アルゴリズム
あるスレッドが他のキューからタスクを盗んで実行することを指します.使用するシーンは、1つの大きなタスクを複数の小さなタスクに分割し、スレッド間の競争を減らすために、これらのサブタスクをそれぞれ異なるキューに配置し、各キューにはキュー内のタスクを実行するために個別のスレッドがあり、スレッドとキューが1つずつ対応しています.しかし、Aスレッドは自分のキューのタスクを処理し、Bスレッドのキューには多くのタスクが処理される場合があります.Aは情熱的なスレッドで、過去に手伝いたいのですが、2つのスレッドが同じキューにアクセスすると競争が生じるので、Aは両端キューの末尾からタスクを実行する方法を考えました.Bスレッドは常に両端のキューのヘッダからタスクを実行し(タスクは独立した小さなタスク)、Aスレッドは泥棒がBスレッドのものを盗んでいるような感じがします.
ワーク・盗難アルゴリズムの利点:
スレッドを用いて並列計算を行い,スレッド間の競合を低減した.
ワーク・盗難アルゴリズムの欠点:
1.両端キューにタスクが1つしかない場合、スレッド間で競合が発生します.
2、盗難アルゴリズムは、複数のスレッドと複数の両端キューを作成するなど、より多くのシステムリソースを消費する.
三、フレーム設計
Fork/Joinの2つの重要なクラス:
1、ForkJoinTask:このフレームワークを使用するには、forkおよびjoin操作をタスクで実行するメカニズムを提供するForkJoinタスクを作成する必要があります.一般的に、ForkJoinTaskクラスを直接継承する必要はありません.そのサブクラスを継承する必要があります.そのサブクラスは2つあります.
a、RecursiveAction:結果を返さないタスクに使用します.
b、RecursiveTask:結果を返すタスクに使用します.
2、ForkJoinPool:タスクForkJoinTaskはForkJoinPoolで実行する必要がある.
 1 package test; 2  3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ForkJoinPool; 5 import java.util.concurrent.Future; 6 import java.util.concurrent.RecursiveTask; 7  8  9 public class CountTask extends RecursiveTask10 {11     private static final long serialVersionUID = 1L;12     //  13     private static final int THRESHOLD = 2;14     private int start;15     private int end;16     17     public CountTask(int start, int end)18     {19         this.start = start;20         this.end = end;21     }22 23     @Override24     protected Integer compute()25     {26         int sum = 0;27         //         28         boolean canCompute = (end - start) <= THRESHOLD;29         if(canCompute)30         {31             //      ,     32             for(int i=start; i<=end; i++)33             {34                 sum += i;35             }36         }37         else38         {39             //      ,        40             int middle = (start + end)/2;41             CountTask leftTask = new  CountTask(start,middle);42             CountTask rightTask = new  CountTask(middle+1,end);43             //     44             leftTask.fork();45             rightTask.fork();46             //        ,       47             int leftResult = leftTask.join();48             int rightResult = rightTask.join();49             //     50             sum = leftResult + rightResult;51             52         }53         return sum;54     }55     56     public static void main(String[] args)57     {58         ForkJoinPool forkJoinPool = new ForkJoinPool();59         CountTask task = new CountTask(1,6);60         //      61         Future result = forkJoinPool.submit(task);62         try63         {64             System.out.println(result.get());65         }66         catch (InterruptedException e)67         {68             e.printStackTrace();69         }70         catch (ExecutionException e)71         {72             e.printStackTrace();73         }74         75     }76     77 }

このプログラムは1+2+3+4+5+6を1+2に分割する.3+4;5+6の3つの部分をサブプログラムで計算してマージします.
四、ソースコードの解読
1、leftTask.fork();
1 public final ForkJoinTask fork() {2         Thread t;3         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)4             ((ForkJoinWorkerThread)t).workQueue.push(this);5         else6             ForkJoinPool.common.externalPush(this);7         return this;8     }

forkメソッドの内部では、現在のスレッドがForkJoinWorkerThreadのインスタンスであるかどうかを判断し、条件が満たされている場合は、taskタスクを現在のスレッドが維持している両端キューにpushします.
 1  final void push(ForkJoinTask> task) { 2             ForkJoinTask>[] a; ForkJoinPool p; 3             int b = base, s = top, n; 4             if ((a = array) != null) {    // ignore if queue removed 5                 int m = a.length - 1;     // fenced write for task visibility 6                 U.putOrderedObject(a, ((m & s) <= m)13                     growArray();14             }15         }

Pushメソッドでは、ForkJoinPoolのsignalWorkメソッドが呼び出され、taskタスクを非同期で実行するためにワークスレッドが起動または作成されます.
2、
 public final V join() {        int s;        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);        return getRawResult();
    }

doJoinメソッドが返すタスクの状態で判断し、NORMALでない場合は例外を投げます.
 private void reportException(int s) {        if (s == CANCELLED)            throw new CancellationException();        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }

doJoinの方法を見てみましょう.
private int doJoin() {        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;        return (s = status)