同時処理利器-CompletionService

3340 ワード

    :         ,          。

サマリ
このようなニーズを考慮して、一連のタスクを同時に処理し、各タスクが完了した後、結果に対していくつかの後続処理を行い、最後に結果をまとめます.1つ目のシナリオ:複数のスレッドの同時処理タスクを開始し、すべてのFutureが戻るまで各スレッドの処理結果Futrueを循環的に監視します.この案は可能だが、すべての結果の完成状況を自分で監視する必要があり、退屈ではないか.CompletionServiceを試してみましょう.
CompletionService
まず、このインタフェースで定義されている方法を見てみましょう.
  • Future submit(Callable task); Callableタスクをコミットし、Future結果を返します.
  • Future submit(Runnable task, V result); 前のメソッドと同様に、タスクが完了すると指定したresultオブジェクトが返されます.
  • Future take() throws InterruptedException; 最新の完了したタスクの結果を取得して削除します.このプロセスはブロックされています.
  • Future poll(); 最新の完了したタスク結果を取得して削除し、結果がなければnullを返します.
  • Future poll(long timeout, TimeUnit unit) throws InterruptedException; 最新のタスク結果の取得と削除を待機するタイムアウト時間を指定します.

  • 結果を取得したいくつかのインタフェースから、最新の完了した結果が返されます.これがポイントで、私たちはすべての結果を待つことを監視する必要はありません(もし待つ最初のFutureが最も遅いならば、他の先に完成した任務を妨げるのではないでしょうか)、結果の完成順にすべての戻り結果を得て、先に完成した結果は先に後続の処理を実行することができて、これは良いのではないでしょうか.
    ExecutorCompletionServiceはこのインタフェースの実装クラスで、内部にスレッドプールとBlockingQueueキューがあります.実装原理は簡単です.ExecutorCompletionServiceにコミットされるタスクごとに、タスクの実行が完了した後にコールバックするdone()メソッドを書き換え、実行が完了したFutureTaskを内部キューに追加します.take()などの方法は,実際には内部キューに最新の完了結果FutrueTaskを取得する.
    コントラスト
    コードの面から2つのシナリオの違いを見てみましょう.
    public void test1() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List> results = new ArrayList>(10);
        for(int i=0; i<10; i++) {
            Future result = executorService.submit(new MyRunnable());
            results.add(result);
        }
        for(Future result : results) {
            String str = result.get();//       Future,            ,           
            //do something
        }
        //    
        executorService.shutdown();
    }
    
    public void test2() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletionService completionService = new executorCompletionService(executorService);
        for(int i=0; i<10; i++) {
            completionService.submit(new MyRunnable());
        }
        for(int i=0; i<10; i++) {
            String str = completionService.take().get();//             
            //do something
        }
        //    
        executorService.shutdown();
    }
    

    コード量の差は少ないが,シナリオ2ではリストを1つだけ維持してすべての処理結果Futureを保存する必要はない.重要なのは、completionServiceは、タスクの結果が完了順に次々と到来するため、各タスクの進捗状況が相互に干渉しないため、後続の操作も相互に影響しません.第1のシナリオでは、最初のタスクが遅い場合、他のタスクは第1のタスクが完了するまで待機しなければなりません.これは、パフォーマンスに明らかな影響を及ぼします.
    踏み込み注意
  • スレッドプール
  • を閉じる
    テストメソッドでは、プレゼンテーションのためにスレッドプールが作成され、メソッドの終了時にもスレッドプールが閉じられます.コードがサンプルコードと似ている場合は、スレッドプールを閉じることを覚えておいてください.そうしないと、メソッドが終了しても、作成したスレッドは回収および閉じることができず、遅かれ早かれリソースが消費されたり、メモリが爆発したりします.スレッドプールがグローバルに共有されている場合は、この問題はなく、JVMが閉じるとスレッドプールが閉じます.
  • 誤った使い方
  • public void test3() {
        Future future = completionService.submit(new MyRunnable());//          
        String str = future.get();
        //do something
    }
    

    このシーンはcompleteServicesを使用するのに適していないかもしれませんが、ここではもう一つの問題を説明し、completionServicesを直接使用します.submitの戻り結果Futureは、現在の戻り結果のみを取得することに関心を持ち、BlockingQueueに保存されているFutureオブジェクトを無視すると、BlockingQueueキューが大きくなり(デフォルトではLinkedBlockingQueue、無境界キュー)、遅かれ早かれメモリが爆発します.正しい使い方はcompletionServicesです.take()を使用してFutureオブジェクトを取得します.
    何か説明が間違っている場合は、指摘を歓迎します.