エレガントな非同期コードの作成方法-CompletableFuture
前言
私たちの意識では、同期して実行されるプログラムは人々の考え方に合っていますが、非同期のものは通常処理しにくいです.非同期計算の場合、コールバックで表される動作はコードに分散したり、内部にネストされたりすることが多く、いずれかのステップで発生する可能性のあるエラーを処理する必要がある場合、状況はさらに悪化します.Java 8は、CompletableFutureクラスの導入を含む多くの新しい特性を導入しており、これにより、50以上の方法を含む明確で読み取り可能な非同期コードの作成が容易になります.
CompletableFutureとは
なぜCompletableFutureを導入するのか ブロック呼び出し チェーン呼び出しと結果集約処理は、多くの場合、複数の 異常処理
これらの問題は
CompletableFutureの作成方法
最も簡単な作成方法は、完了した
不完全な
タスク非同期処理
プログラムがバックグラウンドでタスクの処理結果に関心を持たずにタスクを非同期で実行させたい場合、
バックグラウンドでタスクを非同期で実行し、タスクの処理結果を取得する必要がある場合は、
ここで質問があるかもしれませんが、
チェーン呼び出しと結果集約処理thenApply を使用します.thenAccept を使用します.thenRunこのメソッドは、Futureが完了した後にカスタムビジネスコードを実行し、その値を返さないようにする場合に使用できます
大量の非同期計算がある場合は、あるコールバックから別のコールバックに値を渡し続けることができます.つまり、チェーン呼び出し方式を使用すると、簡単に使用できます.
注意深い友人は、前のいくつかの方法の例の中で、すべての方法が同じスレッド上で実行されていることに気づくかもしれません.これらのタスクを個別のスレッドで実行する場合は、これらのメソッドに対応する非同期バージョンを使用できます.
結果処理の実行
複数のタスクの実行完了を待つ
多くの場合、複数のタスクを並列に実行し、すべてのタスクが完了した後に処理します.3人の異なるユーザーの名前を検索し、結果をマージするとします.この場合、
例外処理
マルチスレッドではプログラム異常は実際にはうまく処理されていませんが、幸いなことに
上記のコードでは、3つのメソッド
第2の方法は
3つ目は
まとめ
本文では、
私たちの意識では、同期して実行されるプログラムは人々の考え方に合っていますが、非同期のものは通常処理しにくいです.非同期計算の場合、コールバックで表される動作はコードに分散したり、内部にネストされたりすることが多く、いずれかのステップで発生する可能性のあるエラーを処理する必要がある場合、状況はさらに悪化します.Java 8は、CompletableFutureクラスの導入を含む多くの新しい特性を導入しており、これにより、50以上の方法を含む明確で読み取り可能な非同期コードの作成が容易になります.
CompletableFutureとは
CompletableFuture
クラスの設計のインスピレーションはGoogle Guava
のListenableFutureクラスから来て、それはFuture
とCompletionStage
インタフェースを実現してそして多くの方法を追加して、それはlambdaをサポートして、コールバックを通じて非ブロック方法を利用して、非同期プログラミングモデルを昇格させました.これにより、プライマリ・アプリケーション・スレッドとは異なるスレッド(すなわち非同期)でタスクを実行し、タスクの進捗、完了、または失敗をプライマリ・スレッドに通知することで、非ブロック・コードを記述できます.なぜCompletableFutureを導入するのか
Java
の1.5バージョンにはFuture
が導入されており、演算結果を取得するための2つの方法を提供する演算結果のプレースホルダとして簡単に理解できます.get()
:メソッドスレッドを呼び出すと、演算結果が無期限に待機します.get(long timeout, TimeUnit unit)
:メソッドスレッドを呼び出すと、指定された時間timeout
内でのみ結果が待機し、待機タイムアウトするとTimeoutException
異常が放出されます.Future
は、Runnable
またはCallable
インスタンスを使用してコミットされたタスクを完了することができ、そのソースコードから、次のようないくつかの問題があることがわかります.get()
メソッドは、計算が完了するまでブロックされ、完了時に通知できるメソッドは提供されず、コールバック関数を追加する機能も持たない.Future
をリンクして時間のかかる計算を完了したい場合があります.この場合、結果を統合して別のタスクに送信する必要があります.このインタフェースでは、この処理を完了するのは難しいです.Future
異常処理は一切提供されていない.これらの問題は
CompletableFuture
で解決されています.次に、CompletableFuture
をどのように使用するかを見てみましょう.CompletableFutureの作成方法
最も簡単な作成方法は、完了した
CompletableFuture.completedFuture(U value)
オブジェクトを取得するためにCompletableFuture
メソッドを呼び出すことです.@Test
public void testSimpleCompletableFuture() {
CompletableFuture completableFuture = CompletableFuture.completedFuture("Hello mghio");
assertTrue(completableFuture.isDone());
try {
assertEquals("Hello mghio", completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
不完全な
CompleteableFuture
呼び出しget
メソッドの場合、Future
が完了していないため、get
呼び出しは永遠にブロックされ、この場合CompletableFuture.complete
メソッドを使用して手動でFuture
を完了することができることに注意してください.タスク非同期処理
プログラムがバックグラウンドでタスクの処理結果に関心を持たずにタスクを非同期で実行させたい場合、
runAsync
メソッドを使用して、Runnable
タイプのパラメータがCompletableFuture
を返すことを受信することができる.@Test
public void testCompletableFutureRunAsync() {
AtomicInteger variable = new AtomicInteger(0);
CompletableFuture runAsync = CompletableFuture.runAsync(() -> process(variable));
runAsync.join();
assertEquals(100, variable.get());
}
public void process(AtomicInteger variable) {
System.out.println(Thread.currentThread() + " Process...");
variable.set(100);
}
バックグラウンドでタスクを非同期で実行し、タスクの処理結果を取得する必要がある場合は、
supplyAsync
メソッドを使用して、Supplier
タイプのパラメータを受信してCompletableFuture
を返すことができます.@Test
public void testCompletableFutureSupplyAsync() {
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(this::process);
try {
assertEquals("Hello mghio", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "Hello mghio";
}
ここで質問があるかもしれませんが、
runAsync
とsupplyAsync
のタスクを実行するスレッドはどこから来たのか、誰が作成したのか.実際にはJava 8のparallelStream
と同様であり、CompletableFuture
もグローバルForkJoinPool.commonPool()
から得られたスレッドからこれらのタスクを実行します.また、上記の2つの方法では、カスタムスレッドプールを使用してタスクを実行する方法も提供されています.実際には、CompletableFuture
のソースコードを理解すると、API
のすべての方法にリロードされたバージョンがあり、カスタムExecutor
アクチュエータがあるかどうかがわかります.@Test
public void testCompletableFutureSupplyAsyncWithExecutor() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);
try {
assertEquals("Hello mghio", supplyAsync.get()); // Blocking
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public String process() {
return "Hello mghio";
}
チェーン呼び出しと結果集約処理
CompletableFuture
のget()
メソッドは、結果が得られるまで
メソッドを継続することを知っています.CompletableFuture
は、thenApply
メソッド、thenAccept
メソッド、およびthenRun
メソッドなどを提供して、このような状況を回避し、タスク完了後のコールバック通知を追加することもできます.これらの方法の使用シーンは次のとおりです.Future
から値を受信したタスクの前にカスタムビジネスコードを実行し、そのタスクにいくつかの値を返す場合は、Future
からいくつかの値を受信した後にタスクを実行する前に、結果値を返すことなくカスタムビジネスコードを実行したい場合は、@Test
public void testCompletableFutureThenApply() {
Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)
.thenApply(this::thenApplyNotify) // Non Blocking
.join();
assertEquals(new Integer(1), notificationId);
}
@Test
public void testCompletableFutureThenAccept() {
CompletableFuture.supplyAsync(this::processVariable)
.thenAccept(this::thenAcceptNotify) // Non Blocking
.join();
assertEquals(100, variable.get());
}
@Test
public void testCompletableFutureThenRun() {
CompletableFuture.supplyAsync(this::processVariable)
.thenRun(this::thenRunNotify)
.join();
assertEquals(100, variable.get());
}
private String processVariable() {
variable.set(100);
return "success";
}
private void thenRunNotify() {
System.out.println("thenRun completed notify ....");
}
private Integer thenApplyNotify(Integer integer) {
return integer;
}
private void thenAcceptNotify(String s) {
System.out.println(
String.format("Thread %s completed notify ....", Thread.currentThread().getName()));
}
public Integer thenApplyProcess() {
return 1;
}
大量の非同期計算がある場合は、あるコールバックから別のコールバックに値を渡し続けることができます.つまり、チェーン呼び出し方式を使用すると、簡単に使用できます.
@Test
public void testCompletableFutureThenApplyAccept() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.thenAccept((i) -> notifyByEmail()).join();
}
private void notifyByEmail() {
// business code
System.out.println("send notify by email ...");
}
private Double notifyBalance(Double d) {
// business code
System.out.println(String.format("your balance is $%s", d));
return 9527D;
}
private Double calculateBalance(Object o) {
// business code
return 9527D;
}
private Double findAccountNumber() {
// business code
return 9527D;
}
注意深い友人は、前のいくつかの方法の例の中で、すべての方法が同じスレッド上で実行されていることに気づくかもしれません.これらのタスクを個別のスレッドで実行する場合は、これらのメソッドに対応する非同期バージョンを使用できます.
@Test
public void testCompletableFutureApplyAsync() {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService newSingleThreadScheduledExecutor = Executors
.newSingleThreadScheduledExecutor();
CompletableFuture completableFuture =
CompletableFuture
.supplyAsync(this::findAccountNumber,
newFixedThreadPool) // newFixedThreadPool
.thenApplyAsync(this::calculateBalance,
newSingleThreadScheduledExecutor)
.thenApplyAsync(this::notifyBalance);
Double balance = completableFuture.join();
assertEquals(9527D, balance);
}
結果処理の実行
thenCompose
メソッドは、アカウント残高を計算するビジネスなど、依存性のあるタスク処理に適しています.まず、アカウントを見つけてから、そのアカウントの残高を計算し、計算が完了してから通知を送信します.これらのタスクはすべて、前のタスクの戻りCompletableFuture
結果に依存します.この場合、thenCompose
メソッドを使用する必要があります.Java 8ストリームのflatMap
操作に似ています.@Test
public void testCompletableFutureThenCompose() {
Double balance = this.doFindAccountNumber()
.thenCompose(this::doCalculateBalance)
.thenCompose(this::doSendNotifyBalance).join();
assertEquals(9527D, balance);
}
private CompletableFuture doSendNotifyBalance(Double aDouble) {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doSendNotifyBalance ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private CompletableFuture doCalculateBalance(Double d) {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doCalculateBalance ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private CompletableFuture doFindAccountNumber() {
sleepSeconds(2);
// business code
System.out.println(String.format("%s doFindAccountNumber ....", Thread.currentThread().getName()));
return CompletableFuture.completedFuture(9527D);
}
private void sleepSeconds(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
thenCombine
メソッドは、主に複数の独立したタスクをマージするための処理結果である.人の名前と住所を検索する必要があると仮定すると、異なるタスクを使用してそれぞれ取得し、その人の完全な情報(名前+住所)を取得するには、この2つの方法の結果を統合する必要があります.では、thenCombine
方法を使用することができます.@Test
public void testCompletableFutureThenCombine() {
CompletableFuture thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);
String personInfo = thenCombine.join();
assertEquals("mghio Shanghai, China", personInfo);
}
private CompletableFuture findAddress() {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "Shanghai, China";
});
}
private CompletableFuture findName() {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "mghio ";
});
}
複数のタスクの実行完了を待つ
多くの場合、複数のタスクを並列に実行し、すべてのタスクが完了した後に処理します.3人の異なるユーザーの名前を検索し、結果をマージするとします.この場合、
CompletableFuture
の静的メソッドallOf
を使用することができます.このメソッドは、すべてのタスクの完了を待つことになります.このメソッドは、すべてのタスクのマージ結果を返さないので、タスクの実行結果を手動で組み合わせる必要があります.@Test
public void testCompletableFutureAllof() {
List> list = Lists.newArrayListWithCapacity(4);
IntStream.range(0, 3).forEach(num -> list.add(findName(num)));
CompletableFuture allFuture = CompletableFuture
.allOf(list.toArray(new CompletableFuture[0]));
CompletableFuture> allFutureList = allFuture
.thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));
CompletableFuture futureHavingAllValues = allFutureList
.thenApply(fn -> String.join("", fn));
String result = futureHavingAllValues.join();
assertEquals("mghio0mghio1mghio2", result);
}
private CompletableFuture findName(int num) {
return CompletableFuture.supplyAsync(() -> {
sleepSeconds(2);
// business code
return "mghio" + num;
});
}
例外処理
マルチスレッドではプログラム異常は実際にはうまく処理されていませんが、幸いなことに
CompletableFuture
では便利な異常処理方法を提供してくれました.@Test
public void testCompletableFutureThenCompose() {
Double balance = this.doFindAccountNumber()
.thenCompose(this::doCalculateBalance)
.thenCompose(this::doSendNotifyBalance).join();
}
上記のコードでは、3つのメソッド
doFindAccountNumber
、doCalculateBalance
およびdoSendNotifyBalance
のいずれかに異常が発生した場合、その後に呼び出されるメソッドは実行されません.CompletableFuture
は、exceptionally
、handle
およびwhenComplete
メソッドの3つの処理異常を提供する.1つ目の方法はexceptionally
メソッドを使用して異常を処理し、前のメソッドが失敗して異常が発生した場合、異常コールバックが呼び出されます.@Test
public void testCompletableFutureExceptionally() {
CompletableFuture thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.exceptionally(ex -> {
System.out.println("Exception " + ex.getMessage());
return 0D;
});
Double join = thenApply.join();
assertEquals(9527D, join);
}
第2の方法は
handle
メソッドを使用して異常を処理することであり、このメソッドを使用して異常を処理することは、上記のexceptionally
メソッドよりも柔軟であり、異常オブジェクトと現在の処理結果を同時に取得することができる.@Test
public void testCompletableFutureHandle() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.handle((ok, ex) -> {
System.out.println(" ...");
if (ok != null) {
System.out.println("No Exception !!");
} else {
System.out.println("Exception " + ex.getMessage());
return -1D;
}
return ok;
});
}
3つ目は
whenComplete
メソッドを使用して異常を処理することである.@Test
public void testCompletableFutureWhenComplete() {
CompletableFuture.supplyAsync(this::findAccountNumber)
.thenApply(this::calculateBalance)
.thenApply(this::notifyBalance)
.whenComplete((result, ex) -> {
System.out.println("result = " + result + ", ex = " + ex);
System.out.println(" ...");
});
}
まとめ
本文では、
CompletableFuture
クラスの一部の方法と使用方法を紹介した.このクラスの方法の多くは同時に提供される機能も非常に強く、非同期プログラミングで使用されることが多い.基本的な使用方法を熟知した後、ソース分析の原理を深く理解するか、それとも深く理解するか.