java.util.concurrent.ExecutorCompletionService


具体的には、ExecutorCompletionServiceがどのようにCompletionServiceを実装するか(ソースコードの表示をクリック)を見て、前編で述べた優雅な実装の2つのパスを実装します.
アップソースコード
package java.util.concurrent;

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

ExecutorCompletionServiceはCompletionService(ソースコードの表示をクリック)の実装です.ExecutorCompletionServiceは、構造関数をExecutorに注入し、キューをブロックすることによって、タスクの実行と戻りを実現します.
まずメンバー変数を見てみましょう
private final Executor executor; これは最も基本的なアクチュエータです(ソースコードを表示するにはクリックしてください)
private final AbstractExecutorService aes; これはライフサイクル付きのアクチュエータです(ソースコードの表示をクリックしてください)
private final BlockingQueue> completionQueue; これは、タスク実行結果を保存するためのブロックキューです.
private class QueueingFuture extends FutureTask
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

これは内部クラスで、主にFutureTaskを継承し、構造方法、RunnableFutureを親クラスFutureTaskに提出する構造方法であり、同時にdoneという方法を実現し、意味は簡単で、タスクの実行が完了したときにRunnableFutureをブロックキューに入れる.
ここが少しめまいがするかもしれませんが、RunnableFuture(ソースコードの表示をクリック)、FutureTaskはいったいどんな関係ですか?
簡単に紹介する
RunnableFuture(ソースコードの表示をクリック)は、RunnableとFuture(ソースコードの表示をクリック)を継承するインタフェースであり、FutureTaskはRunnableFutureの実装である.計算結果を取得するには、同期または非同期で実行できます.キャンセルできます.
彼のやり方を見てみろ
private RunnableFuture newTaskFor(Callable task)
private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

パラメータ:Callable(クリックしてソースコードを表示)1つの実行タスク
戻る:RunnableFutureで紹介したばかりです
実装:
  • AbstractExecutorService(ソースコードの表示をクリック)が空の場合、FutureTaskを直接インスタンス化して戻ります.そうしないと、AbstractExecutorServiceのnewTaskForメソッド
  • が呼び出されます.
    解読:論理を実現するのは簡単で、肝心なのはこの論理判断で、AbstractExecutorServiceのnewTaskFor方法を覚えていますか?上のnullの実装とそっくりです.どうしてこんなことをするの?簡単ですが、AbstractExecutorServiceは継承と書き換えが可能です.転送されたAbstractExecutorServiceが書き換えられている場合.では、書き換えたAbstractExecutorServiceを使います.
    職責単一の設計原則を覚えていますか?クラスは1つのことしかできません.AbstractExecutorServiceがライフサイクルを担当している場合は、ライフサイクルに関連する実装はできません.それが正しい.もちろん、Executorと提携している場合、Executorはポリシーの実行のみを担当しているため、AbstractExecutorServiceは結果を返すパッケージ化を行うことができます.
    private RunnableFuture newTaskFor(Runnable task, V result)
    同上
    public ExecutorCompletionService(Executor executor)
        public ExecutorCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }

    パラメータ:Executorエグゼクティブ
    実装:
  • 非空判定
  • Executorをメンバー変数
  • に割り当てる
  • ExecutorがAbstractExecutorServiceである場合、AbstractExecutorServiceにも
  • が割り当てられます.
  • 新しいチェーンブロックキュー
  • 解読:AbstractExecutorServiceはExecutorの実装であるため、当然Executorに値を割り当てることができ、実行されたタスクはExecutorに任せて完了する.キャンセルしたり、実行の状態を観察したりすれば、構築時にAbstractExecutorServiceで実現するはずです.
    public ExecutorCompletionService(Executor executor,  BlockingQueue> completionQueue)
    同様に、キューをブロックする方法がより柔軟になります.
    public Future submit(Callable task)
        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }

    解読:Executorにタスクを提出して実行し、提出したオブジェクトは先にQueueingFutureにカプセル化される.QueueingFutureは実行結果をブロックキューに入れます
    public Future submit(Runnable task, V result)
    同上
    take() poll() poll(long timeout, TimeUnit unit)
        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }
    
        public Future<V> poll() {
            return completionQueue.poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }

    この3つの方法を一緒に説明すると、主にブロックキューの3つの方法が呼び出されます.具体的には、ブロックキューの1編を参照してください.