エレガントな非同期コードの作成方法-CompletableFuture


前言
私たちの意識では、同期して実行されるプログラムは人々の考え方に合っていますが、非同期のものは通常処理しにくいです.非同期計算の場合、コールバックで表される動作はコードに分散したり、内部にネストされたりすることが多く、いずれかのステップで発生する可能性のあるエラーを処理する必要がある場合、状況はさらに悪化します.Java 8は、CompletableFutureクラスの導入を含む多くの新しい特性を導入しており、これにより、50以上の方法を含む明確で読み取り可能な非同期コードの作成が容易になります.
CompletableFutureとはCompletableFutureクラスの設計のインスピレーションはGoogle GuavaListenableFutureクラスから来て、それはFutureCompletionStageインタフェースを実現してそして多くの方法を追加して、それはlambdaをサポートして、コールバックを通じて非ブロック方法を利用して、非同期プログラミングモデルを昇格させました.これにより、プライマリ・アプリケーション・スレッドとは異なるスレッド(すなわち非同期)でタスクを実行し、タスクの進捗、完了、または失敗をプライマリ・スレッドに通知することで、非ブロック・コードを記述できます.
なぜCompletableFutureを導入するのかJavaの1.5バージョンにはFutureが導入されており、演算結果を取得するための2つの方法を提供する演算結果のプレースホルダとして簡単に理解できます.
  • get():メソッドスレッドを呼び出すと、演算結果が無期限に待機します.
  • get(long timeout, TimeUnit unit):メソッドスレッドを呼び出すと、指定された時間timeout内でのみ結果が待機し、待機タイムアウトするとTimeoutException異常が放出されます.
  • Futureは、RunnableまたはCallableインスタンスを使用してコミットされたタスクを完了することができ、そのソースコードから、次のようないくつかの問題があることがわかります.
  • ブロック呼び出しget()メソッドは、計算が完了するまでブロックされ、完了時に通知できるメソッドは提供されず、コールバック関数を追加する機能も持たない.
  • チェーン呼び出しと結果集約処理は、多くの場合、複数のFutureをリンクして時間のかかる計算を完了したい場合があります.この場合、結果を統合して別のタスクに送信する必要があります.このインタフェースでは、この処理を完了するのは難しいです.
  • 異常処理Future異常処理は一切提供されていない.

  • これらの問題はCompletableFutureで解決されています.次に、CompletableFutureをどのように使用するかを見てみましょう.
    CompletableFutureの作成方法
    最も簡単な作成方法は、完了したCompletableFuture.completedFuture(U value)オブジェクトを取得するためにCompletableFutureメソッドを呼び出すことです.
    @Test
    public void testSimpleCompletableFuture() {
        CompletableFuture completableFuture = CompletableFuture.completedFuture("Hello mghio");
        assertTrue(completableFuture.isDone());
        try {
            assertEquals("Hello mghio", completableFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    不完全なCompleteableFuture呼び出しgetメソッドの場合、Futureが完了していないため、get呼び出しは永遠にブロックされ、この場合CompletableFuture.completeメソッドを使用して手動でFutureを完了することができることに注意してください.
    タスク非同期処理
    プログラムがバックグラウンドでタスクの処理結果に関心を持たずにタスクを非同期で実行させたい場合、runAsyncメソッドを使用して、RunnableタイプのパラメータがCompletableFutureを返すことを受信することができる.
    @Test
    public void testCompletableFutureRunAsync() {
        AtomicInteger variable = new AtomicInteger(0);
        CompletableFuture runAsync = CompletableFuture.runAsync(() -> process(variable));
        runAsync.join();
        assertEquals(100, variable.get());
    }
    
    public void process(AtomicInteger variable) {
        System.out.println(Thread.currentThread() + " Process...");
        variable.set(100);
    }

    バックグラウンドでタスクを非同期で実行し、タスクの処理結果を取得する必要がある場合は、supplyAsyncメソッドを使用して、Supplierタイプのパラメータを受信してCompletableFutureを返すことができます.
    @Test
    public void testCompletableFutureSupplyAsync() {
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(this::process);
        try {
            assertEquals("Hello mghio", supplyAsync.get()); // Blocking
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public String process() {
        return "Hello mghio";
    } 

    ここで質問があるかもしれませんが、runAsyncsupplyAsyncのタスクを実行するスレッドはどこから来たのか、誰が作成したのか.実際にはJava 8のparallelStreamと同様であり、CompletableFutureもグローバルForkJoinPool.commonPool()から得られたスレッドからこれらのタスクを実行します.また、上記の2つの方法では、カスタムスレッドプールを使用してタスクを実行する方法も提供されています.実際には、CompletableFutureのソースコードを理解すると、APIのすべての方法にリロードされたバージョンがあり、カスタムExecutorアクチュエータがあるかどうかがわかります.
    @Test
    public void testCompletableFutureSupplyAsyncWithExecutor() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
        try {
            assertEquals("Hello mghio", supplyAsync.get()); // Blocking
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public String process() {
        return "Hello mghio";
    }

    チェーン呼び出しと結果集約処理CompletableFutureget()メソッドは、結果が得られるまで メソッドを継続することを知っています.CompletableFutureは、thenApplyメソッド、thenAcceptメソッド、およびthenRunメソッドなどを提供して、このような状況を回避し、タスク完了後のコールバック通知を追加することもできます.これらの方法の使用シーンは次のとおりです.
  • thenApplyFutureから値を受信したタスクの前にカスタムビジネスコードを実行し、そのタスクにいくつかの値を返す場合は、
  • を使用します.
  • thenAcceptFutureからいくつかの値を受信した後にタスクを実行する前に、結果値を返すことなくカスタムビジネスコードを実行したい場合は、
  • を使用します.
  • thenRunこのメソッドは、Futureが完了した後にカスタムビジネスコードを実行し、その値を返さないようにする場合に使用できます
  • @Test
    public void testCompletableFutureThenApply() {
        Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
            .thenApply(this::thenApplyNotify) // Non Blocking
            .join();
        assertEquals(new Integer(1), notificationId);
    }
    
    @Test
    public void testCompletableFutureThenAccept() {
        CompletableFuture.supplyAsync(this::processVariable)
            .thenAccept(this::thenAcceptNotify) // Non Blocking
            .join();
        assertEquals(100, variable.get());
    }
    
    @Test
    public void testCompletableFutureThenRun() {
        CompletableFuture.supplyAsync(this::processVariable)
            .thenRun(this::thenRunNotify)
            .join();
        assertEquals(100, variable.get());
    }
    
    private String processVariable() {
        variable.set(100);
        return "success";
    }
    
    private void thenRunNotify() {
        System.out.println("thenRun completed notify ....");
    }
    
    private Integer thenApplyNotify(Integer integer) {
        return integer;
    }
    
    private void thenAcceptNotify(String s) {
        System.out.println(
        String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
    }
    
    public Integer thenApplyProcess() {
        return 1;
    }

    大量の非同期計算がある場合は、あるコールバックから別のコールバックに値を渡し続けることができます.つまり、チェーン呼び出し方式を使用すると、簡単に使用できます.
    @Test
    public void testCompletableFutureThenApplyAccept() {
        CompletableFuture.supplyAsync(this::findAccountNumber)
            .thenApply(this::calculateBalance)
            .thenApply(this::notifyBalance)
            .thenAccept((i) -> notifyByEmail()).join();
    }
    
    private void notifyByEmail() {
        // business code
        System.out.println("send notify by email ...");
    }
    
    private Double notifyBalance(Double d) {
        // business code
        System.out.println(String.format("your balance is $%s", d));
        return 9527D;
    }
    
    private Double calculateBalance(Object o) {
        // business code
        return 9527D;
    }
    
    private Double findAccountNumber() {
        // business code
        return 9527D;
    }

    注意深い友人は、前のいくつかの方法の例の中で、すべての方法が同じスレッド上で実行されていることに気づくかもしれません.これらのタスクを個別のスレッドで実行する場合は、これらのメソッドに対応する非同期バージョンを使用できます.
    @Test
    public void testCompletableFutureApplyAsync() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
            .newSingleThreadScheduledExecutor();
        CompletableFuture completableFuture =
            CompletableFuture
                .supplyAsync(this::findAccountNumber,
                    newFixedThreadPool) //      newFixedThreadPool         
                .thenApplyAsync(this::calculateBalance,
                    newSingleThreadScheduledExecutor)
                .thenApplyAsync(this::notifyBalance);
        Double balance = completableFuture.join();
        assertEquals(9527D, balance);
    }

    結果処理の実行thenComposeメソッドは、アカウント残高を計算するビジネスなど、依存性のあるタスク処理に適しています.まず、アカウントを見つけてから、そのアカウントの残高を計算し、計算が完了してから通知を送信します.これらのタスクはすべて、前のタスクの戻りCompletableFuture結果に依存します.この場合、thenComposeメソッドを使用する必要があります.Java 8ストリームのflatMap操作に似ています.
    @Test
    public void testCompletableFutureThenCompose() {
        Double balance = this.doFindAccountNumber()
            .thenCompose(this::doCalculateBalance)
            .thenCompose(this::doSendNotifyBalance).join();
        assertEquals(9527D, balance);
    }
    
    private CompletableFuture doSendNotifyBalance(Double aDouble) {
        sleepSeconds(2);
        // business code
        System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
        return CompletableFuture.completedFuture(9527D);
    }
    
    private CompletableFuture doCalculateBalance(Double d) {
        sleepSeconds(2);
        // business code
        System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
        return CompletableFuture.completedFuture(9527D);
    }
    
    private CompletableFuture doFindAccountNumber() {
        sleepSeconds(2);
        // business code
        System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
        return CompletableFuture.completedFuture(9527D);
    }
    
    private void sleepSeconds(int timeout) {
        try {
            TimeUnit.SECONDS.sleep(timeout);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    thenCombineメソッドは、主に複数の独立したタスクをマージするための処理結果である.人の名前と住所を検索する必要があると仮定すると、異なるタスクを使用してそれぞれ取得し、その人の完全な情報(名前+住所)を取得するには、この2つの方法の結果を統合する必要があります.では、thenCombine方法を使用することができます.
    @Test
    public void testCompletableFutureThenCombine() {
        CompletableFuture thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
        String personInfo = thenCombine.join();
        assertEquals("mghio Shanghai, China", personInfo);
    }
    
    private CompletableFuture findAddress() {
        return CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            // business code
            return "Shanghai, China";
        });
    }
    
    private CompletableFuture findName() {
        return CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            // business code
            return "mghio ";
        });
    }

    複数のタスクの実行完了を待つ
    多くの場合、複数のタスクを並列に実行し、すべてのタスクが完了した後に処理します.3人の異なるユーザーの名前を検索し、結果をマージするとします.この場合、CompletableFutureの静的メソッドallOfを使用することができます.このメソッドは、すべてのタスクの完了を待つことになります.このメソッドは、すべてのタスクのマージ結果を返さないので、タスクの実行結果を手動で組み合わせる必要があります.
    @Test
    public void testCompletableFutureAllof() {
        List> list = Lists.newArrayListWithCapacity(4);
        IntStream.range(0, 3).forEach(num -> list.add(findName(num)));
    
        CompletableFuture allFuture = CompletableFuture
            .allOf(list.toArray(new CompletableFuture[0]));
    
        CompletableFuture> allFutureList = allFuture
            .thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    
        CompletableFuture futureHavingAllValues = allFutureList
            .thenApply(fn -> String.join("", fn));
    
        String result = futureHavingAllValues.join();
        assertEquals("mghio0mghio1mghio2", result);
    }
    
    private CompletableFuture findName(int num) {
        return CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            // business code
            return "mghio" + num;
        });
    } 

    例外処理
    マルチスレッドではプログラム異常は実際にはうまく処理されていませんが、幸いなことにCompletableFutureでは便利な異常処理方法を提供してくれました.
    @Test
    public void testCompletableFutureThenCompose() {
        Double balance = this.doFindAccountNumber()
            .thenCompose(this::doCalculateBalance)
            .thenCompose(this::doSendNotifyBalance).join();
    }

    上記のコードでは、3つのメソッドdoFindAccountNumberdoCalculateBalanceおよびdoSendNotifyBalanceのいずれかに異常が発生した場合、その後に呼び出されるメソッドは実行されません.CompletableFutureは、exceptionallyhandleおよびwhenCompleteメソッドの3つの処理異常を提供する.1つ目の方法はexceptionallyメソッドを使用して異常を処理し、前のメソッドが失敗して異常が発生した場合、異常コールバックが呼び出されます.
    @Test
    public void testCompletableFutureExceptionally() {
        CompletableFuture thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
            .thenApply(this::calculateBalance)
            .thenApply(this::notifyBalance)
            .exceptionally(ex -> {
                System.out.println("Exception " + ex.getMessage());
                return 0D;
            });
        Double join = thenApply.join();
        assertEquals(9527D, join);
    }

    第2の方法はhandleメソッドを使用して異常を処理することであり、このメソッドを使用して異常を処理することは、上記のexceptionallyメソッドよりも柔軟であり、異常オブジェクトと現在の処理結果を同時に取得することができる.
    @Test
    public void testCompletableFutureHandle() {
        CompletableFuture.supplyAsync(this::findAccountNumber)
            .thenApply(this::calculateBalance)
            .thenApply(this::notifyBalance)
            .handle((ok, ex) -> {
                System.out.println("        ...");
                if (ok != null) {
                System.out.println("No Exception !!");
                } else {
                System.out.println("Exception " + ex.getMessage());
                return -1D;
                }
                return ok;
            });
    }

    3つ目はwhenCompleteメソッドを使用して異常を処理することである.
    @Test
    public void testCompletableFutureWhenComplete() {
        CompletableFuture.supplyAsync(this::findAccountNumber)
            .thenApply(this::calculateBalance)
            .thenApply(this::notifyBalance)
            .whenComplete((result, ex) -> {
                System.out.println("result = " + result + ", ex = " + ex);
                System.out.println("        ...");
            });
    }

    まとめ
    本文では、CompletableFutureクラスの一部の方法と使用方法を紹介した.このクラスの方法の多くは同時に提供される機能も非常に強く、非同期プログラミングで使用されることが多い.基本的な使用方法を熟知した後、ソース分析の原理を深く理解するか、それとも深く理解するか.