Fork/joinフレームワークのForkJoinPool


概要


jdk 7には同時フレームワーク-fork/joinフレームワークが追加され、このフレームワークの下で、ForkJoinTaskは実行する必要があるタスクを表し、本当にこれらのタスクを実行するスレッドはスレッドプール(ForkJoinPool)の中に置かれています.ForkJoinPoolはForkJoinTaskを実行できるExcuteServicesです.ExcuteServicesとは異なり、work-stealingモードを採用しています.プール内のすべてのスレッドが他のスレッドで作成されたサブタスクを実行しようとします.これにより、スレッドがアイドル状態になることは少なく、非常に効率的です.
プールにはForkJoinWorkerThreadオブジェクト配列が維持されています.配列サイズはparallelismプロパティによって決まり、parallelismはデフォルトでプロセッサ数です.
int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;//MAX_ID=0x7fff
        else { 
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];
可視スレッド数は0 x 7 fffを超えない.

スレッドの追加


どのような場合にスレッドを追加する必要がありますか?新しいタスクが来ると、スレッドプールは他のスレッドに処理を通知し、待機中のスレッドがないか、アクティブなスレッドが非常に少ない場合(ctlプロパティで判断される)、スレッドプールにスレッドを追加します.
    private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }
スレッドを追加するコードは比較的簡単で、工場クラスを通じてスレッドを作成し、ForkJoinWorkerThreadを呼び出すrunメソッドを通じてこのスレッドを起動します.失敗した場合はctl以前の値を復元し、スレッドを終了します.ファクトリクラスは直接その構築方法を呼び出し、最終的にスレッドを追加するのはregisterWorkerメソッドで完了します.
for (int g;;) {
            ForkJoinWorkerThread[] ws;
            if (((g = scanGuard) & SG_UNIT) == 0 &&
                UNSAFE.compareAndSwapInt(this, scanGuardOffset,
                                         g, g | SG_UNIT)) {
                int k = nextWorkerIndex;
                try {
                    if ((ws = workers) != null) { // ignore on shutdown
                        int n = ws.length;
                        if (k < 0 || k >= n || ws[k] != null) {
                            for (k = 0; k < n && ws[k] != null; ++k)
                                ;
                            if (k == n)
                                ws = workers = Arrays.copyOf(ws, n << 1);
                        }
                        ws[k] = w;
                        nextWorkerIndex = k + 1;
                        int m = g & SMASK;
                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
                    }
                } finally {
                    scanGuard = g;
                }
                return k;
            }
            else if ((ws = workers) != null) { // help release others
                for (ForkJoinWorkerThread u : ws) {
                    if (u != null && u.queueBase != u.queueTop) {
                        if (tryReleaseWaiter())
                            break;
                    }
                }
            }

ここで属性scanGuardは,Guardという言葉から何を保護しているのか,worksという配列を保護しているのかを知る必要がある.この配列を更新する必要がある場合,scanGuardを絶えずチェックすることで保護の目的を達成する.フレーム全体にシーケンスロックが大量に採用されており、ブロックされないメリットがあり、悪いところは追加のループがあることです.ここでもループによってこのスレッドを登録し、ループの過程で2つの状況が発生した:1、compareAndSwapInt操作に成功した;2、操作に失敗しました.
1つ目は、workers配列をスキャンし、空のアイテムを見つけ、新しく作成したスレッドをこの位置に配置することです.見つからない場合は、配列のサイズが足りないことを示す場合は、配列を2倍に拡大します.
第2のケース:ループ再試行が直接成功するまで、コードから、失敗しても、他のスレッドに完了していないタスクを実行するように通知することを忘れません.

タスクの実行


ExecutorServiceから継承されたexecuteメソッドとsubmitメソッドのほか、この2つのメソッドを上書きして再ロードしました.
public void execute(ForkJoinTask> task) {
    if (task == null)
        throw new NullPointerException();
    forkOrSubmit(task);
}

public void execute(Runnable task) {
    if (task == null)
        throw new NullPointerException();
    ForkJoinTask> job;
    if (task instanceof ForkJoinTask>) // avoid re-wrap
        job = (ForkJoinTask>) task;
    else
        job = ForkJoinTask.adapt(task, null);
    forkOrSubmit(job);
}
パラメータがRunnableのexecuteを強化し、Runnableという一般的なタスクをForkJoinTaskというタスクに適合させ、その後パラメータとしてforkOrSubmitメソッドに伝達して統一的に処理した.
    private  void forkOrSubmit(ForkJoinTask task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }
上記のコードから分かるように、この2つのタスクの最終的な帰属はまだ異なり、ForkJoinTaskというタスクはスレッド内部のキューに配置され、普通のRunnableタスクはスレッドプールのキューに配置されています.
executeメソッドを呼び出すほか、ForkJoinTaskタスクに対してforkメソッドを呼び出すことで、自分のスレッドキューにタスクを追加することもできます.

スレッドプールの終了


ExecutorServiceと同様に、shutdown()とshutdownNow()を呼び出してスレッドを終了できます.各スレッドのタスクステータスをCANCELLLEDに設定し、Threadのinterruptメソッドを呼び出して各スレッドを終了します.
まとめ:ForkJoinPoolはExcuteServicesです.ExcuteServicesとは異なります.
1、ExcuteServicesはRunnable/Callableタスクを実行し、ForkJoinPoolはRunnableタスクのほか、ForkJoinTaskタスクを実行することができる.
2、ExcuteServicesの後のタスクは前のタスクの実行を待ってから実行する必要がありますが、ForkJoinPoolはwork-stealingモードで他のスレッドのタスクの実行を支援します.つまり、ExcuteServicesは同時問題を解決し、ForkJoinPoolは並列問題を解決します.
次のセクションでは、Fork/JoinフレームワークのForkJoinTaskタスクの分析を行います.