ListenableFuture in Guava

8418 ワード

ListenableFutureの説明


同時プログラミングは難題であるが,強力で簡単な抽象は同時作成を著しく簡略化することができる.このような考えから、GuavaはListenableFutureインタフェースを定義し、JDK concurrentパッケージの下のFutureインタフェースを継承している.ListenableFutureでは、コールバックメソッド(callbacks)を登録し、演算(マルチスレッド実行)が完了したときに呼び出したり、演算(マルチスレッド実行)が完了した後にすぐに実行したりすることができる.このような簡単な改良により、より多くの操作を明らかにサポートすることができ、このような機能はJDK concurrentのFutureではサポートされていない.高同時性かつ大量のFutureオブジェクトが必要な場合は、できるだけListenableFutureを使用することをお勧めします.
ListenableFutureの基本手法はaddListener(Runnable,Executor)であり,マルチスレッド演算が完了するとExecutorで指定されたRunnableが実行される.

ListenableFutureの作成と使用


JDKのExecutorServicesに対応する.submit(Callable)がマルチスレッド非同期演算をコミットする方式で、GuavaはListeningExecutorServiceインタフェースを提供し、このインタフェースはListenableFutureを返し、対応するExecutorServiceは通常のFutureを返す.ExecutorServiceからListeningExecutorServiceに移行するにはMoreExecutorsを使用します.ListeningDecorator(ExecutorService)が装飾しています.例:
  
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

このListeningExecutorServiceにCallableタスクをコミットできます
final ListenableFuture<String> future =  pool.submit(new Callable<String>() {

            @Override

            public String call() throws Exception {

                Thread.sleep(1000*3);

                return     "Task done !";

            }

        });

次にListenerを追加します.
future.addListener(new Runnable() {

                @Override

                public void run() {

                    try {

                        final String contents = future.get();

                        System.out.println(contents);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    } catch (ExecutionException e) {

                        e.printStackTrace();

                    }

                }

            }, MoreExecutors.sameThreadExecutor());

私たちは上のコードを見て、確かにあまり優雅ではありません.私たちは投げた異常を処理する必要があります.自分でfutureを通過する必要があります.get()は、前に計算した値を取得します.もっと簡単な方法はありませんか?もちろん、Guavaは上記の書き方の代わりに簡単な方法を提供しています.
Futures.addCallback(future, new FutureCallback<String>() {

                @Override

                public void onSuccess(String result) {

                    System.out.println(result);

                }



                @Override

                public void onFailure(Throwable t) {

                    t.printStackTrace();

                }

            });

完了コードは次のとおりです.
package concurrency;



import com.google.common.util.concurrent.FutureCallback;

import com.google.common.util.concurrent.Futures;

import com.google.common.util.concurrent.ListenableFuture;

import com.google.common.util.concurrent.ListeningExecutorService;

import com.google.common.util.concurrent.MoreExecutors;



import java.util.concurrent.Callable;

import java.util.concurrent.Executors;



/**

 * Created by hupeng on 2014/9/24.

 */

public class ListenableFutureTest {





    public static void main(String[] args) throws InterruptedException {

        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));



        final ListenableFuture<String> future = pool.submit(new Callable<String>() {

            @Override

            public String call() throws Exception {

                Thread.sleep(1000 * 2);

                return "Task done !";

            }

        });



//            future.addListener(new Runnable() {

//                @Override

//                public void run() {

//                    try {

//                        final String contents = future.get();

//                        System.out.println(contents);

//                    } catch (InterruptedException e) {

//                        e.printStackTrace();

//                    } catch (ExecutionException e) {

//                        e.printStackTrace();

//                    }

//                }

//            }, MoreExecutors.sameThreadExecutor());



        Futures.addCallback(future, new FutureCallback<String>() {

            @Override

            public void onSuccess(String result) {

                System.out.println(result);

            }



            @Override

            public void onFailure(Throwable t) {

                t.printStackTrace();

            }

        });



        Thread.sleep(5 * 1000);  //wait for task done



        pool.shutdown();

    }

}