AbstraactExecutorServiceタスクのコミット


詳細
最後に2つのinvokeAnyメソッドを見てみましょう.このメソッドとinvokeAllの違いは、invokeAllは、すべてのタスクの実行が完了するまで(完了orキャンセルor異常)ブロックされます(すべてのタスクの結果が返されます).invokeAnyは、いずれかのメソッドが完了するとすぐに返されます(戻るときに最初に実行されたタスクの結果).コードを確認すると、この2つのinvokeAnyメソッドはdoInvokeAnyメソッドを直接呼び出して実現されています.
 
public  T invokeAny(Collection extends Callable> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public  T invokeAny(Collection extends Callable> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

 
一つはタイムアウト制限があり、一つはありません.メインコードはdoInvokeAnyメソッドを見て、まず署名を見なければなりません.
 
private  T doInvokeAny(Collection extends Callable> tasks,
                            boolean timed, long nanos)

 
1番目のパラメータtasksはCallableタスクセットであり、2番目のパラメータはタイムアウト制限があるかどうかを示し、3番目のパラメータはタイムアウト制限がある場合、時間はnanosナノ秒以内に制限されることを示します.
具体的な実装部分を見てみましょう.
 
if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List> futures= new ArrayList>(ntasks);
        ExecutorCompletionService ecs =
            new ExecutorCompletionService(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            long lastTime = (timed)? System.nanoTime() : 0;
            Iterator extends Callable> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (InterruptedException ie) {
                        throw ie;
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (Future f : futures)
                f.cancel(true);
        }

 
本体(try{...}finally{...})の部分を直接見ると、ntasksは現在まだいくつかのタスクがコミットされていないことを示し、activeは現在コミットされているタスク数を示す.tryではまず1つをコミットし、続いて無限ループfor(;)、すべてはこの無限の循環の中にあり、この無限の循環を分析するには、2つの部分に分けることができます.
 
タスクをコミットして実行し、if(f==null)セクション:
 
1)まずタスクをコミットし、まだタスクがコミットされていない場合(ntasks>0)にタスクをコミットします.
2)そうでない場合はスレッドが実行されているかどうかをチェックし(すなわちactive==0)、active=0が成立するとこの無限ループから飛び出し、これは異常分岐であり、最終的に後続コードthrow eeを投げ出す.
3)1)2)両方のステップが成立しない場合,時間制限であるtimed=trueがある場合はecs.poll(nanos, TimeUnit.NANOSECONDS);取得、取得できない場合は投げます.時間制限がなければ直接ecs.take取得.
 
(Note:ここで特に注意したいのは、タスクがコミットされていない場合、2)と3)に実行することは不可能です.つまり、3)に実行されると、すべてのタスクがコミットされたことを説明します.これにより、3番目のステップが取得したのは、すべてのタスクの中で最初に完了したタスクの結果であり、3番目のステップが取得をブロックできる理由であることが保証されます.)
 
取得タスクif(f!=null)セクション
ループに入るとまず1つのタスク(タスクがコミットできる前提の下で)がコミットされ、すぐにfがnullであるかどうかをチェックします.部位nullであれば、すでにタスクが実行されていることを示します.おめでとうございます.追加のタスクをしなくてもいいです.ここで極端な場合、最初のタスク(forループの外で24行目にコミットされた)が最初のforループの29行で結果を得た場合、2番目のタスクはコミットされません.同様に,3回目のサイクルで結果が得られた場合,得られた結果は前の3つのタスクの中で最も速い結果であり,後のタスクはまったく実行されないと分析できる.
結果を得るのは簡単で、futureを直接通過します.get()は結果を取得し、この結果は正常な結果であるか、異常を投げ出すかのいずれかである.正常な結果が得られたらreturnします.そうしないと、後続のコードに異常が放出されます.テスト例は比較的簡単で貼らない.
 
最後にfinallyですべてのタスクをキャンセルします.
 
ここでさらに分析する必要があります.ここでコミットするタスクは、現在のAbstractExecutorServiceオブジェクトではなく、現在のスレッドエフェクタオブジェクトをExecutorCompletionServiceにカプセル化します.
 
ExecutorCompletionService ecs = new ExecutorCompletionService(this);
このecsでタスクをコミットし、結果を取得するため、ExecutorCompletionService、特に現在のメソッドで使用されているsubmit(task)、poll()、poll(nanos、TimeUnit.NANOSECONDS)、take()メソッドを確認します.構築方法から見ると、スレッドエフェクタExecutorはAbstractExecutorServiceから渡されたthisオブジェクトであり、タスクが完了したキューcompletionQueueはLinkedBlockingQueueオブジェクトである.take()でもpoll()でも、完了したタスクを取得するときに、このブロックキューで取得すればよいことが明らかになったようです.コードを表示するには、次のようにします.
 
public Future take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future poll() {
        return completionQueue.poll();
    }

    public Future poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

 
タスクの実行が完了したら、このブロックキューにどのように配置されますか?詳しく見ると、タスクのコミットが発見された場合、このクラスではRunnableFutureオブジェクトをさらにQueueingFutureにパッケージ化しますが、このQueueingFutureはFutureTaskから継承されています.明らかにQueueingFutureはFutureTaskとして追加のことをする必要があります.
 
「もちろん、cancelで自分のことをしたいなら、FutureTaskのdone()メソッドを書き換えることができます.cancelが戻る前にメソッドが呼び出されるので、このメソッドはデフォルトでは何もしません」
 
ここで補足すると、実はcancelの他に、異常が発生してinnerSetExceptionを設定したり、正常終了設定結果値innerSetを実行したりする場合にもdone()メソッドが呼び出されます.だから、タスクの実行が終わった後に自分のことをしたいなら、このdone()メソッドを書き直すことができます.QueingFutureコードを見ると、案の定:
 
private class QueueingFuture extends FutureTask {
        QueueingFuture(RunnableFuture task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future task;
    }

 
タスクの実行(完了orキャンセルor異常)が完了すると、上のdoneメソッドを呼び出すと、現在のタスクがこのブロックキューに追加されます.これは明らかです.ほほほ!
最初にタスクを完了する3つの方法を詳しく見てください.Poll()はキューヘッダオブジェクトを取得し、キューが空でnullを返すと、この方法はブロックされません.completionQueue.poll(timeout,unit)は時間timeoutをブロックし、この時間が空のキューである場合nullを返します.completionQueue.take()は、キューに何かが戻るまでブロックされます.
AbstractExecutorServicesに戻ります.doInvokeAnyメソッドは,その無限ループ部分コードを再度見て,まず1回の結果を取得しようと試みたが,今回は非ブロック取得であり,結果がなければすぐに次のタスクをコミットし,コミットが完了すれば今回のループは終了する.すべてのタスクがコミットされるまで、ループはブロック取得結果セクションのコードに入ります.時間制限があればpoll(timeout,unit)を使用して許容時間範囲内でブロックします.そうしないとtake()永久ブロックで取得します.ブロックの論理実装はブロックキューに渡されます.
この部分も記録し終わって、少し乱れていて、書いているのがわかるかどうか分かりません.