Java 8 CompletableFutureプログラミング

12227 ワード

一、紹介
 非同期呼び出しとは、実際には、呼び出された関数の戻り値を待つことなく操作を継続させる方法である.Java言語では、簡単に言えば、呼び出しの一部の計算を完了するために別のスレッドを開き、計算結果を待つことなく呼び出しを実行または戻すことができます.しかし、呼び出し者はスレッドの計算結果を取得する必要があります.
 JDK 5には、非同期計算の結果を記述するFutureインタフェースが追加されました.Futureおよび関連する使用方法は、タスクを非同期で実行する能力を提供するが、結果の取得には不便であり、ブロックまたはポーリングでのみタスクの結果を得ることができる.ブロックの方式は明らかに私たちの非同期プログラミングの初心に反して、ポーリングの方式はまた無意味なCPU資源を消費して、しかもタイムリーに計算結果を得ることができません.
private static final ExecutorService POOL = Executors.newFixedThreadPool(TASK_THRESHOLD, new ThreadFactory() {
        AtomicInteger atomicInteger = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "demo15-" + atomicInteger.incrementAndGet());
        }
    });

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Future submit = POOL.submit(() -> 123);
        // 1. get()           ,         ,  get        ,         
        Integer get = submit.get();
        // 2. isDone()         Future      。
        boolean done = submit.isDone();
        // 3. cancel(boolean mayInterruptIfRunning)          。                 。
        boolean cancel = submit.cancel(true);
        // 4. isCancelled()     task     .
        boolean cancelled = submit.isCancelled();
        // 5. invokeAll       
        Callable callable = () -> "Hello Future";
        List> callables = Lists.newArrayList(callable, callable, callable, callable);
        List> futures = POOL.invokeAll(callables);
    }

Java 8では、CompletableFutureは非常に強力なFutureの拡張機能を提供し、非同期プログラミングの複雑さを簡略化し、関数式プログラミングの能力を提供し、コールバック方式で計算結果を処理し、CompletableFutureを変換し、組み合わせる方法も提供しています.
tips:CompletionStageは非同期計算中のあるフェーズを表し、あるフェーズが完了すると別のフェーズがトリガーされる可能性があります.
二、CompletableFuture使用
1. runAsync、supplyAsync
//     
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
//     
public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

runAsync、supplyAsyncメソッドは、CompletableFutureが提供する非同期オペレーションの作成方法です.スレッドプールとしてExecutorが指定されていない場合は、ForkJoinPool.commonPool()をスレッドプールとして使用して非同期コードを実行します.スレッドプールを指定する場合は、指定したスレッドプールを使用して実行します.以下のすべての方法は同じです.
public class Demo1 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture runAsync = CompletableFuture.runAsync(() -> System.out.println(123));

        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> "CompletableFuture");
        System.out.println(supplyAsync.get());
    }
}

2. whenComplete、exceptionally
//      ,              whenComplete    。
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)
//      ,  whenCompleteAsync                。
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function fn)

CompletableFutureの計算が完了するとwhenCompleteメソッドが実行されます.CompletableFuture計算で例外が放出されると、exceptionallyメソッドが実行されます.
public class Demo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture runAsync = CompletableFuture.supplyAsync(() -> 123456);
        runAsync.whenComplete((t, throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.whenCompleteAsync((t, throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.exceptionally((throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
            }
            return null;
        });
        TimeUnit.SECONDS.sleep(2);
    }
}

3. thenApply、handle
// T:            
// U:          

public  CompletableFuture thenApply(Function super T,? extends U> fn)
public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)

public  CompletionStage handle(BiFunction super T, Throwable, ? extends U> fn);
public  CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn);
public  CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn,Executor executor);

1つのスレッドが別のスレッドに依存する場合、thenApplyメソッドを使用して2つのスレッドをシリアル化できます.
handleメソッドはthenApplyメソッドとほぼ同じです.異なるのはhandleがタスクが完了した後に実行され、異常なタスクを処理することもできます.thenApplyは通常のタスクのみを実行でき、タスクに異常が発生した場合はthenApplyメソッドは実行されません.
public class Demo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // thenApply
        CompletableFuture thenApply = CompletableFuture.supplyAsync(() -> 123).thenApply(t -> t * t);
        System.out.println(thenApply.get());

       // handle
        CompletableFuture handle = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return new Random().nextInt(10);
        }).handle((t, throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
                return -1;
            }
            return t * t;
        });
        System.out.println(handle.get());
    }
}

4. thenAccept、thenRun
public CompletionStage thenAccept(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action,Executor executor);

public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);

thenAcceptはタスクの処理結果を受信し,処理を消費する.結果は返されません.
thenRunとthenAcceptの方法が異なるのは,タスクの処理結果に関心がないことである.上記のタスクの実行が完了したらthenRunの実行を開始します.
public class Demo4 {

    public static void main(String[] args) {
        // thenAccept
        CompletableFuture thenAccept = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenAccept(System.out::println);

       // thenRun
        CompletableFuture thenRun = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenRun(() -> System.out.println(123));
    }
}

5. thenCombine、thenAcceptBoth
 // T       CompletionStage        
 // U       CompletionStage        
 // V   thenCombine/thenAcceptBoth       
public  CompletionStage thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public  CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public  CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn,Executor executor);

public  CompletionStage thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public  CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public  CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn,Executor executor);

thenCombine、thenAcceptBothは、両方のCompletionStageのタスクが完了するのを待って、両方のタスクの結果を一括して処理するために使用されます.違いはthenCombineに戻り値があることです.thenAcceptBothには戻り値がありません.
public class Demo5 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // thenCombine
        CompletableFuture thenCombine = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenCombine(CompletableFuture.supplyAsync(() -> "str"),
                        //           CompletionStage      
                        //           CompletionStage      
                        (i, s) -> i + s
                );
        System.out.println(thenCombine.get());

        // thenAcceptBoth 
        CompletableFuture thenAcceptBoth = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "str"), 
                        (i, s) -> System.out.println(i + s));
    }
}

6. applyToEither、acceptEither、runAfterEither、runAfterBoth
  • applyToEither:2つのCompletionStageで、誰が戻りの結果を実行するのが速いか、そのCompletionStageの結果で次の処理を行い、戻り値があります.
  • acceptEither:2つのCompletionStageで、誰が戻りの結果を実行するのが速いか、そのCompletionStageの結果で次の処理を行い、戻り値はありません.
  • runAfterEither:2つのCompletionStageが完了すると、いずれも次の操作(Runnable)が実行され、戻り値はありません.
  • runAfterBoth:2つのCompletionStageは、計算が完了してから次の操作(Runnable)を実行し、戻り値はありません.

  • これらの方法は意味が近いため,より類似しているため,applyToEitherで紹介する...
    // T    CompletionStage           
    // U                
    public  CompletionStage applyToEither(CompletionStage extends T> other,Function super T, U> fn);
    public  CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn);
    public  CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn,Executor executor);
    public class Demo6 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture applyToEither = CompletableFuture.supplyAsync(() -> {
                int nextInt = new Random().nextInt(10);
                try {
                    Thread.sleep(nextInt);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f1=" + nextInt);
                return nextInt;
            }).applyToEither(CompletableFuture.supplyAsync(() -> {
                int nextInt = new Random().nextInt(10);
                try {
                    Thread.sleep(nextInt);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f2=" + nextInt);
                return nextInt;
            }), i -> i);
    
            System.out.println(applyToEither.get());
        }
    }

    7. thenCompose
    public  CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
    public  CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn) ;
    public  CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn, Executor executor) ;

    thenComposeメソッドでは、2つのCompletionStageに対して流水線操作を行い、最初の操作が完了すると、その結果をパラメータとして2番目の操作に渡すことができます.
    public class Demo7 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture thenCompose = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                    .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * i));
            System.out.println(thenCompose.get());
    
        }
    }

    参考文献:https://www.jianshu.com/p/6bac52527ca4