マルチスレッド(5)

20233 ワード

invokeAllが言った後、AbstractExecutorServiceのinvokeAnyメソッドを見てみましょう.このメソッドは、invokeAllとは異なり、与えられたタスクでは、あるタスクが完了した場合(異常放出がない場合)、タスク実行の結果が返されます.この点はメソッドの戻り値からもわかります.すべてのタスクの完了は要求されません.1つの完了(異常がない場合)だけです.JDKの説明:<T> T invokeAny(Collection<? extends Callable<T>> tasks)・与えられたタスクを実行し、あるタスクが正常に完了していれば(つまり異常を投げ出さなかった)、その結果を返す.
戻り値がTであることもこの点を示している.
ソース:
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)

        throws InterruptedException, ExecutionException {

        try {

            return doInvokeAny(tasks, false, 0);

        } catch (TimeoutException cannotHappen) {

            assert false;

            return null;

        }

    }



    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,

                           long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

        return doInvokeAny(tasks, true, unit.toNanos(timeout));

    }

このコードは何も言うことはありません.ポイントはdoInvokeAny()メソッドの上で、直接ソースコードにアップロードすることです.
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,

                            boolean timed, long nanos)

        throws InterruptedException, ExecutionException, TimeoutException {

        if (tasks == null)

            throw new NullPointerException();

        int ntasks = tasks.size();

        if (ntasks == 0)

            throw new IllegalArgumentException();

        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

        ExecutorCompletionService<T> ecs =

            new ExecutorCompletionService<T>(this);



        // For efficiency, especially in executors with limited

        // parallelism, check to see if previously submitted tasks are

        // done before submitting more of them. This interleaving

        // plus the exception mechanics account for messiness of main

        // loop.



        try {

            // Record exceptions so that if we fail to obtain any

            // result, we can throw the last exception we got.

            ExecutionException ee = null;

            long lastTime = timed ? System.nanoTime() : 0;

            Iterator<? extends Callable<T>> it = tasks.iterator();



            // Start one task for sure; the rest incrementally

            futures.add(ecs.submit(it.next()));

            --ntasks;

            int active = 1;



            for (;;) {

                Future<T> f = ecs.poll();

                if (f == null) {

                    if (ntasks > 0) {

                        --ntasks;

                        futures.add(ecs.submit(it.next()));

                        ++active;

                    }

                    else if (active == 0)

                        break;

                    else if (timed) {

                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);

                        if (f == null)

                            throw new TimeoutException();

                        long now = System.nanoTime();

                        nanos -= now - lastTime;

                        lastTime = now;

                    }

                    else

                        f = ecs.take();

                }

                if (f != null) {

                    --active;

                    try {

                        return f.get();

                    } catch (ExecutionException eex) {

                        ee = eex;

                    } catch (RuntimeException rex) {

                        ee = new ExecutionException(rex);

                    }

                }

            }



            if (ee == null)

                ee = new ExecutionException();

            throw ee;



        } finally {

            for (Future<T> f : futures)

                f.cancel(true);

        }

    }

これは複雑ですが、一歩一歩です.
まず、タスクに基づいて戻り値を設定します.
List> futures= new ArrayList>(ntasks);
ntasksは現在のタスク数です.
次に、ExecutorCompletionService JDKのクラスの作成について説明します.
提供されたExecutorを使用してタスクを実行するCompletionService.このクラスでは、完了時にコミットされたタスクを手配し、takeを使用してアクセス可能なキューに配置します.このクラスは非常に軽く、いくつかのタスクを実行するときに一時的に使用するのに適しています.具体的なソースコードは、後で分析します.
具体的な実装コードはtry/catchで
ntasks現在のタスク数
Active現在実行中のプロセス数
//Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
まずtaskをコミットし、次にループに入り、コミット効率を向上させる.
そして無限ループに入り、2つの状況に分けられます.
if (f == null)
まずecsについてお話ししますPollのメソッドの役割:次の完了したタスクを表すFutureを取得して削除し、このようなタスクが存在しない場合nullを返します.
f==nullがこの件が任務を完了していないことを説明した場合、3つの状況に分けられます.
(1)現在のプロセスがまだコミットされていない場合は、プロセスをコミットし続ける(後になるほど遅くなるため、タスクをコミットする際に実行時間の短いタスクを前に置くのが一般的である.
(2)現在実行されているプロセス数が0の場合、直接的にループをスキップし、これは異常な分岐であり、ループをスキップした後、if(ee==null)ee=new ExecutionException()を実行する.throw ee; 異常を投げ出す.
(3)既に提出可能なタスクがなく、かつ実行中のタスクがある場合、すなわち(1)/(2)が成立しない場合は、直ちにタスクの結果を取得し、時間制限がある場合はf=ecsである.poll(nanos, TimeUnit.NANOSECONDS);,取得できない場合はタイムアウト異常を放出します.ないのはf.get()です.
ここで条件(3)について説明するが、提出可能なタスクがなく、かつ現在実行されているタスクが(3)までしか実行されない場合がある.これも、より迅速に結果を得るために、タスクが最初に完了したタスクのうち、どのタスクが最初に完了したかを早急に取得するためである.これにより、取得タスクの結果をブロックすることができる.
If(f != null)
f nullに対して既にタスクが完了していることを説明せず、実行中のタスク数を減らし、実行結果を直接取得する(ブロックされる可能性がある).
最後にfinallyで、すべてのタスクをキャンセルします.
InvokeAllの主な論理は分析済みで、私たちはまだ小さな尾を残しています.彼はどのように任務が実行された後、直接結果のシーケンスに詰め込まれることを保証しますか.この比較の重要なのは、Poll方法です.
まず、呼び出したコンストラクション関数のソースコードを見てみましょう.
public ExecutorCompletionService(Executor executor,

                                     BlockingQueue<Future<V>> completionQueue) {

        if (executor == null || completionQueue == null)

            throw new NullPointerException();

        this.executor = executor;

        this.aes = (executor instanceof AbstractExecutorService) ?

            (AbstractExecutorService) executor : null;

        this.completionQueue = completionQueue;

    }

構築された関数から見ると、このexecutorはabstractExecutorServiceです.主な方法は次のとおりです.
ecs.submit(it.next())
ecs.poll();
ecs.poll(nanos, TimeUnit.NANOSECONDS);
これらの方法の主な鍵は、結果キューcompletionQueueにあり、takeでもpollでもBlockingQueueがサポートしており、ソースコードのメソッドpoll()およびtake()から見ることができる.
public Future<V> submit(Runnable task, V result) {

        if (task == null) throw new NullPointerException();

        RunnableFuture<V> f = newTaskFor(task, result);

        executor.execute(new QueueingFuture(f));

        return f;

    }



    public Future<V> take() throws InterruptedException {

        return completionQueue.take();

    }



    public Future<V> poll() {

        return completionQueue.poll();

    }

では、完了したタスクはどのようにこのキューに配置されますか?まず、タスクをコミットする方法に注目する必要があります.
public Future<V> submit(Runnable task, V result) {

        if (task == null) throw new NullPointerException();

        RunnableFuture<V> f = newTaskFor(task, result);

        executor.execute(new QueueingFuture(f));

        return f;

    }

QueueingFutureは内部クラス、ソースコードは:
private class QueueingFuture extends FutureTask<Void> {

        QueueingFuture(RunnableFuture<V> task) {

            super(task, null);

            this.task = task;

        }

        protected void done() { completionQueue.add(task); }

        private final Future<V> task;

    }

この方法の再定義を見ると、protected void done(){completionQueue.add(task);}
インタフェースdone()のメソッドJDKについて説明します.
protected void done()
このタスクがステータスisDone(通常であれキャンセルであれ)に変換されると、保護されたメソッドが呼び出されます.デフォルトのインプリメンテーションでは、アクションは実行されません.サブクラスは、コールバックの完了または簿記の実行を呼び出すためにこのメソッドを書き換えることができます.このメソッドのインプリメンテーション内のステータスを問い合せて、タスクがキャンセルされたかどうかを判断できます.
サブクラスQueueingFutureはdone()メソッドを書き換え、taskが完了すると実行結果をcompletionQueueに格納します.前述したタスクが完了すると、完了したブロックキューに格納されます.
このため、JDKではExecutorCompletionServiceについて2つのアプリケーションがあります.
ある問題に対する一連のソルバがあると仮定すると、各ソルバはあるタイプのResult値を返し、同時に実行したい場合は、null値でない各ソルバの戻り結果をメソッドuse(Resultr)で処理します.プログラムは次のように記述できます.
void solve(Executor e, Collection<Callable<Result>> solvers)



    throws InterruptedException, ExecutionException {



        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);



        for (Callable<Result> s : solvers)



            ecs.submit(s);



        int n = solvers.size();



        for (int i = 0; i < n; ++i) {



            Result r = ecs.take().get();



            if (r != null)



                use(r);



        }

タスクセットの最初のnull以外の結果を使用して、例外が発生したタスクを無視し、最初のタスクが準備完了したときに他のすべてのタスクをキャンセルしたいとします.
void solve(Executor e, Collection<Callable<Result>> solvers) 

      throws InterruptedException {

        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);

        int n = solvers.size();

        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);

        Result result = null;

        try {

            for (Callable<Result> s : solvers)

                futures.add(ecs.submit(s));

            for (int i = 0; i < n; ++i) {

                try {

                    Result r = ecs.take().get();

                    if (r != null) {

                        result = r;

                        break;

                    }

                } catch(ExecutionException ignore) {}

            }

        }

        finally {

            for (Future<Result> f : futures)

                f.cancel(true);

        }



        if (result != null)

            use(result);

    }

JDKで説明した2番目のケースとinvokeAnyで比較した像は,類比可能である.