フクロウの深夜訳:JAVA CompletableFutureを使った20例


前言
このブログでは、JAVA 8のCompletionStage APIと、JAVAライブラリ内の標準実装CompletableFutureをレビューします.APIの様々な動作は、いくつかの例によって示される.CompletableFutureCompletionInterfaceインタフェースの実装であるため、まずこのインタフェースの契約を理解しなければならない.同期または非同期計算のフェーズを表します.最終的な結果を生み出すための計算のための流水ライン上のユニットと理解することができます.これは、複数のComletionStage命令がリンクされ、1つのフェーズの完了が次のフェーズの実行をトリガすることができることを意味する.CompletionStageインターフェースの実装に加えて、Completionは、開始していない非同期イベントを実装するために使用されるFutureを継承する.Futureを明示的に完成させることができるため、CompletableFutureと名付けられた.
1.完成したCompletableFutureを新規作成
この簡単な例では、完了した予め設定された結果のCompletableFutureが作成される.通常、計算の開始段階として使用されます.
static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}
getNowメソッドは、完了後の結果(ここではmessage)を返し、まだ完了していない場合は、入力されたデフォルト値nullを返します.
2.単純な非同期ステージを実行する
次の例では、非同期運転Runnableのstageを作成する方法を説明します.
static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}

この例では、2つのことを説明します.
  • CompletableFutureのうちAsyncを最後とする方法は、
  • を非同期で実行する.
  • のデフォルトでは、非同期実行は、Executorタスクを実行するためにバックグラウンドスレッドを使用するForkJoinPoolを使用して実装されます.これは、Runnableインプリメンテーションに固有であり、他のCompletableFutureインプリメンテーションは、デフォルトの動作を書き換えることができることに留意されたい.

  • 3.前のステージにメソッドを適用
    次の例は、生成された文字列結果を参照し、その文字列を大文字化する第1の例で完了したCompletableStageを参照する.
    static void thenApplyExample() {
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
            assertFalse(Thread.currentThread().isDaemon());
            return s.toUpperCase();
        });
        assertEquals("MESSAGE", cf.getNow(null));
    }

    ここでのキーワードはCompletableFutureです.
  • thenApplyとは、現段階で正常に実行された後(正常に実行された場合は異常が放出されていないこと)に行われる動作を指す.この例では、現在のフェーズが完了し、値thenが得られる.
  • messageとは、1つのApplyを前の段階に作用する結果
  • を意味する.Functionはブロックされており、これは、大文字操作の実行が完了した後にのみFunctionの方法が実行されることを意味する.
    4.非同期的なアプローチを前のステージに適用
    メソッドの後にgetNow()接尾辞を追加すると、Asyncチェーンは非同期で実行されます(ForkJoinPool.commonPool()を使用します).
    static void thenApplyAsyncExample() {
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            assertTrue(Thread.currentThread().isDaemon());
            randomSleep();
            return s.toUpperCase();
        });
        assertNull(cf.getNow(null));
        assertEquals("MESSAGE", cf.join());
    }

    このメソッドを非同期で実行するには、カスタムExecutorを使用します.
    非同期方法の1つの利点は、CompletableFutureを実行するためにExecutorを提供することである.この例では、大文字操作を実現するために固定サイズのスレッドプールを使用する方法を示します.
    static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;
        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });
    static void thenApplyAsyncWithExecutorExample() {
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
            assertFalse(Thread.currentThread().isDaemon());
            randomSleep();
            return s.toUpperCase();
        }, executor);
        assertNull(cf.getNow(null));
        assertEquals("MESSAGE", cf.join());
    }

    6.コンシューマの前ステージの結果
    次のステージが現在のステージの結果を受信し、計算中に値を返す必要がない場合(例えば、その戻り値がvoidである場合)、方法CompletableStageが使用され、thenAcceptインタフェースに送信される.
    static void thenAcceptExample() {
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture("thenAccept message")
                .thenAccept(s -> result.append(s));
        assertTrue("Result was empty", result.length() > 0);
    }
    Consumerは同期して実行されるので、戻るConsumerでjoin操作を実行する必要はありません.
    7.非同期実行Commsume
    同様に、Asyn接尾辞を使用して実装します.
    static void thenAcceptAsyncExample() {
        StringBuilder result = new StringBuilder();
        CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
                .thenAcceptAsync(s -> result.append(s));
        cf.join();
        assertTrue("Result was empty", result.length() > 0);
    }

    8.計算に異常が発生した場合
    異常が発生したシーンをシミュレートします.簡潔性のために文字列を大文字にしますが、遅延をシミュレートします.CompletableFutureを使用します.最初のパラメータは大文字変換方法で、2番目のパラメータは1秒遅れてthenApplyAsyn(Function, Executor)に操作をコミットする遅延executorです.
    static void completeExceptionallyExample() {
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
                CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
        cf.completeExceptionally(new RuntimeException("completed exceptionally"));
        assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
        try {
            cf.join();
            fail("Should have thrown an exception");
        } catch(CompletionException ex) { // just for testing
            assertEquals("completed exceptionally", ex.getCause().getMessage());
        }
        assertEquals("message upon cancel", exceptionHandler.join());
    }
  • まず、ForkJoinPoolの戻り値を持つ完了したmessageオブジェクトを新規に作成しました.次に、CompletableFutureメソッドを呼び出し、新しいthenApplyAsyncを返します.この方法は非同期で大文字操作を実行する.ここでは、非同期動作を遅延させるためにCompletableFuture法を使用する方法も示した.
  • その後、handler stage、delayedExecutor(timeout, timeUnit)を作成しました.このフェーズでは、すべての例外を処理し、別のメッセージexceptionHandlerに戻ります.
  • 最後に、2番目のフェーズを明示的に完了し、異常を放出し、大文字で操作するフェーズmessage upon cancelを放出します.CompletionExceptionフェーズもトリガーされます.

  • API補足:handler
    前のステージが正常に動作しているかどうかにかかわらず、新しいCompletionStageを返します.入力されたパラメータには、前のフェーズの結果と例外が含まれます.
    9.計算を取り消す
    計算時の異常処理とよく似ており, CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)インタフェースのFutureによって計算を取り消すことができる.
    
    static void cancelExample() {
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
                CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
        assertTrue("Was not canceled", cf.cancel(true));
        assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
        assertEquals("canceled message", cf2.join());
    }

    API補足cancel(boolean mayInterruptIfRunning)
    新しいCompletableFutureを返します.例外が発生した場合は、メソッドで実行された結果、そうでない場合は正常に実行された結果です.
    10.Functionを完了した2つのステージの結果の1つに適用
    次の例では、public CompletableFuture exceptionally(Function fn)オブジェクトが作成され、完了した2つのステージのいずれかにCompletableFutureが作用します(いずれがFunctionに渡されるかは保証されていません).この2つのフェーズは、1つは文字列を大文字にし、もう1つは小文字にします.
    static void applyToEitherExample() {
        String original = "Message";
        CompletableFuture cf1 = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayedUpperCase(s));
        CompletableFuture cf2 = cf1.applyToEither(
                CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                s -> s + " from applyToEither");
        assertTrue(cf2.join().endsWith(" from applyToEither"));
    }
    Function
    新しいCompletableFutureを返します.thisまたはother操作が完了した後、両方のいずれかがfnを実行します.
    11.消費の2段階のいずれかの結果
    前の例と同様に、public CompletableFuture applyToEitherAsync(CompletionStage extends T> other,Function super T,U> fn)Functionに置き換える
    static void acceptEitherExample() {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayedUpperCase(s))
                .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                        s -> result.append(s).append("acceptEither"));
        cf.join();
        assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
    }

    12.両方のフェーズが完了した後にRunnableを実行
    ここで2つのステージは同期して実行され、最初のステージは文字列を大文字に変換した後、2番目のステージは小文字に変換されます.
    static void runAfterBothExample() {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                () -> result.append("done"));
        assertTrue("Result was empty", result.length() > 0);
    }

    13.Biconsumerで2つのステージの結果を受信
    Biconsumerは、2つのステージの結果を同時に操作することをサポートします.
    static void thenAcceptBothExample() {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                (s1, s2) -> result.append(s1 + s2));
        assertEquals("MESSAGEmessage", result.toString());
    }

    14.Bifunctionを2段階に同時に作用させた結果Consumerが2つのフェーズの結果を集計し、値を返す場合、方法CompletableFutureを使用することができる.ここでの計算フローはすべて同期されるので、最後のthenCombineメソッドは、最終結果、すなわち、大文字操作と小文字操作の結果の接合を得ることができる.
    static void thenCombineExample() {
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                        (s1, s2) -> s1 + s2);
        assertEquals("MESSAGEmessage", cf.getNow(null));
    }

    15.非同期でBifunctionを2段階に同時に作用させた結果
    以前の例と同様に、ここでは異なる方法を使用しています.すなわち、2つの段階の操作は非同期です.getNow()も非同期で実行され、Async接尾辞がない場合があります.
    static void thenCombineAsyncExample() {
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayedUpperCase(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                        (s1, s2) -> s1 + s2);
        assertEquals("MESSAGEmessage", cf.join());
    }

    16.Compose CompletableFuture thenCombineを用いて、前の2つの例の動作を完了することができる.
    static void thenComposeExample() {
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
                .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                        .thenApply(s -> upper + s));
        assertEquals("MESSAGEmessage", cf.join());
    }

    17.複数のステージでどのような完了があるか、すなわち新しい完了ステージを作成する
    static void anyOfExample() {
        StringBuilder result = new StringBuilder();
        List messages = Arrays.asList("a", "b", "c");
        List futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
                .collect(Collectors.toList());
        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
            if(th == null) {
                assertTrue(isUpperCase((String) res));
                result.append(res);
            }
        });
        assertTrue("Result was empty", result.length() > 0);
    }

    18.すべてのフェーズが完了すると、新しい完了フェーズが作成されます.
    
    static void allOfExample() {
        StringBuilder result = new StringBuilder();
        List messages = Arrays.asList("a", "b", "c");
        List futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
                .collect(Collectors.toList());
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
        assertTrue("Result was empty", result.length() > 0);
    }

    19.すべてのフェーズが完了したら、非同期完了フェーズを新規作成する
    
    static void allOfAsyncExample() {
        StringBuilder result = new StringBuilder();
        List messages = Arrays.asList("a", "b", "c");
        List futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
                .collect(Collectors.toList());
        CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .whenComplete((v, th) -> {
                    futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                    result.append("done");
                });
        allOf.join();
        assertTrue("Result was empty", result.length() > 0);
    }

    20.リアルシーン
    次に、CompletableFutureを実践するシーンを示します.
  • は、thenComposeメソッドを呼び出すことによって、cars()リストを非同期で取得する.Carを返します.CompletionStage>方法は、リモートRESTエンドポイントを使用して実装されるべきである.
  • このStageと別のStageを組み合わせ、別のStageはcars()を呼び出すことで各車のスコアを非同期で取得します.
  • すべてのCarオブジェクトがスコアを記入すると、rating(manufactureId)を呼び出して最終ステージに入ります.この2つのフェーズが完了すると
  • が実行されます.
  • は、最終ステージ上でallOf()を使用して車両のスコアを印刷する.
  • cars().thenCompose(cars -> {
        List updatedCars = cars.stream()
                .map(car -> rating(car.manufacturerId).thenApply(r -> {
                    car.setRating(r);
                    return car;
                })).collect(Collectors.toList());
        CompletableFuture done = CompletableFuture
                .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
        return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
                .map(CompletableFuture::join).collect(Collectors.toList()));
    }).whenComplete((cars, th) -> {
        if (th == null) {
            cars.forEach(System.out::println);
        } else {
            throw new RuntimeException(th);
        }
    }).toCompletableFuture().join();

    参考資料
    Java CompletableFutureの詳細
    Guide To CompletableFuture
    より多くの開発技術、面接チュートリアル、インターネット会社のプッシュを知りたいなら、私の微信の公衆番号に注目してください.不定期で福祉が支給されますよ~