Java 8非同期プログラミング-completableFuture

9469 ワード

非同期プログラミングの難点
非同期プログラミングをどのように優雅に実現するかはずっと難題であり、非同期プログラミングの一般的な方法はcallbackの方法を採用することであるが、この方法は通常、通常のプロセスのコードにコードをネストし、多層ネストがある場合、コードのメンテナンスがさらに困難である.
また、非同期プログラミングの異常処理もメンテナンスされていないことは困難であり、特にJavaでは、非同期プログラミングは通常新しいスレッドで完了するが、サブスレッドの異常は親スレッドで取得できないため、非同期実行結果の取得には、ポーリング、イベント駆動などによってより大きな代価を払う必要がある.
Futureから言えば
Java 5はその後、Futureを非同期プログラミングに導入し、get()の方法で非同期実行結果の同期待ちと結果を取得する.
Future doSomething = Executors.newSingleThreadExecutor().submit(() -> {
    try {
        Thread.sleep(1000 * 3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "success";
});

String result = doSomething.get();

System.out.println(result);

FutureのApiは比較的簡単であるが,異常処理に友好的ではなく,同時に複数の非同期操作を同時に行う必要がある場合は扱いにくい.
ユーザがログインしてログイン証明書(token)を取得し、ログインしてユーザ情報を取得するシーンがあると仮定する
ExecutorService executors = Executors.newFixedThreadPool(10);

Future login = executors.submit(()->login());
String token = login.get();

Future userInfo = executors.submit(() -> userInfo(token));
String userInfoResult = userInfo.get();

System.out.println(userInfoResult);

この実装方法では,本格的な非同期プログラミングが実現できないか,あるいは我々が望んでいるものではなく,ログイン後にユーザ情報を取得することを望んでいるが,この2つのことが完了した後,結果を統一的に処理し,この方法はログインを待ってからユーザ情報を取得することであり,同期呼び出しと同様であり,我々の想定と一致しない.
CompletableFuture
初対面CompletableFuture
Java 8にはCompletableFutureクラスが導入され、同時にFutureインタフェースとCompletionStageインタフェースが実現され、非同期プログラミング用のApiインタフェースが提供され、非同期処理が提供される
CompletableFutureでは、JavaのPromiseと言える非同期プログラミングの操作が多数用意されています.次に、CompletableFutureによって上記の例を実現します.
String userInfo = CompletableFuture.supplyAsync(() -> login())
    .thenApplyAsync(token -> userInfo(token))
    .get();

System.out.println(userInfo);

CompletableFuture API
CompletableFutureは方法が多く、機能も豊富で、ここでは一つ一つ説明しないが、主にこれらの種類に分けて使用することができる.
1.CompletableFutureをFutureとして使用
CompletableFutureはFutureインタフェースを実現し、つまりFutureができるCompletableFutureも同様に使用でき、completecompleteExceptionallyの方法を加えて結果の終了を制御することができる.
CompletableFuture f = new CompletableFuture<>();

Executors.newSingleThreadExecutor().submit(()->{
    f.complete("hello");
    //f.completeExceptionally(new RuntimeException("error"));
});

String result = f.get();

System.out.println(result);

複数の非同期動作を同時に実行するには、CompletableFutureを使用します.
CompletableFuture f = new CompletableFuture<>();

new Thread(() -> {
    try {
        System.out.println("thread1:" + f.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();

new Thread(() -> {
    try {
        System.out.println("thread2:" + f.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();

f.complete("hello");

2.非同期操作
非同期アクションを作成する方法は、主に次のとおりです.
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable,Executor executor)
public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor)

次のように使用します.
CompletableFuture f = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});

String result = f.get();

System.out.println(result);

3.連続非同期操作
public CompletableFuture thenRun(Runnable action)
public CompletableFuture thenRunAsync(Runnable action)
public CompletableFuture thenRunAsync(Runnable action,Executor executor)
public  CompletableFuture thenApply(Function super T,? extends U> fn) 
public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor) 
public CompletableFuture thenAccept(Consumer super T> action) 
public CompletableFuture thenAcceptAsync(Consumer super T> action) 
public CompletableFuture thenAcceptAsync(Consumer super T> action,Executor executor) 

次のように使用します.
CompletableFuture f = CompletableFuture
                .supplyAsync(() -> "hello")
                .thenApplyAsync(res -> res + " world!")
                .thenAcceptAsync(System.out::println);
// wait for job done
f.get();

4.操作完了待ち
public CompletableFuture whenComplete(BiConsumer super T, ? super Throwable> action) 
public CompletableFuture whenCompleteAsync(BiConsumer super T, ? super Throwable> action) 
public CompletableFuture whenCompleteAsync(BiConsumer super T, ? super Throwable> action, Executor executor)

次のように使用します.
CompletableFuture f = CompletableFuture
        .supplyAsync(() -> "hello")
        .thenApplyAsync(res -> res + " world!")
        .whenComplete((res, err) -> {
            if (err != null) {
                err.printStackTrace();
            } else {
                System.out.println(res);
            }
        });

// wait for job done
f.get();

5.組み合わせ
public  CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn) 
public  CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn) 
public  CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn,Executor executor) 
public  CompletableFuture thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn) 
public  CompletableFuture thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn) 
public  CompletableFuture thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn, Executor executor)

次のように使用します.
CompletableFuture f = CompletableFuture.supplyAsync(() -> "Hello")
        .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " World,"))
        .thenCombine(CompletableFuture.supplyAsync(() -> "CompletableFuture!"), (a, b) -> a + b);

String result = f.get();

System.out.println(result);//Hello World,CompletableFuture!


6.結果&異常処理
public  CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn) 
public  CompletableFuture handleAsync(BiFunction super T, Throwable, ? extends U> fn) 
public  CompletableFuture handleAsync(BiFunction super T, Throwable, ? extends U> fn, Executor executor) 
public CompletableFuture exceptionally(Function fn) 

次のように使用します.
//     
CompletableFuture f = CompletableFuture.supplyAsync(() -> "Hello")
        .thenApplyAsync(res -> res + "World")
        .thenApplyAsync(res -> {
            throw new RuntimeException("error");
        })
        .exceptionally(e -> {
            //handle exception here
            e.printStackTrace();
            return null;
        });
f.get();

//       
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> "Hello")
        .thenApplyAsync(res -> res + "World")
        .thenApplyAsync(res -> {
            throw new RuntimeException("error");
        })
        .handleAsync((res, err) -> {
            if (err != null) {
                //handle exception here
                return null;
            } else {
                return res;
            }
        });

Object result = f2.get();

System.out.println(result);


7.非同期動作を並列に実行し、処理結果を統一する
public static CompletableFuture allOf(CompletableFuture>... cfs)

次のように使用します.
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture f3 = CompletableFuture.supplyAsync(() -> "!");

//   allOf  
CompletableFuture all = CompletableFuture.allOf(f1, f2, f3);
all.get();

System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());

//   StreamAPI
List result = Stream.of(f1, f2, f3)
        .map(CompletableFuture::join)
        .collect(Collectors.toList());

System.out.println(result);

まとめ
Java 7以前は、Javaでは非同期プログラミングの実装が複雑だったり、優雅ではなかったりする可能性がありましたが、CompletableFutureの登場は非同期プログラミングの強力な能力を提供していました.コードの読み取り性とメンテナンス性も一定に向上した.
リファレンス
  • Guide To CompletableFuture
  • Java CompletableFuture詳細