Java応答式プログラミングのCompleable Futureメモ(一)
14969 ワード
本文は主にCompleable Futureの基本的な使い方を紹介します。後の文章の中で簡単なデモを書きます。
1.完成したCompleable Futureを作成する
この場合は通常計算の開始段階で使用します。
タスクが完了すると戻り値が得られ、呼び出しが完了していない場合は設定のデフォルト値(null)に戻ります。
2.同期タスクを実行する
この方法は新しいCopletion Stageを返し、操作関数の戻り値を汎型パラメータとして返します。
thenAppley Ayncと thenAppplyの違いは前者が非同期で任務を遂行することです。
Compleable Futureの方法の中でAyncのサフィックスのは全部非同期です。
Compleable Futureは、非同期でタスクを実行する場合、Exectorを指定しない場合、非同期で実行します。 Fork JoinPool 実現する
非同期操作の時に、私達は使います。 ジョン 任務の完成を待つ
タスクの実行方法は、指定されたExectorを使って実行できます。
次の段階で現在の段階の結果を受信しました。戻り値を使用しない場合は、thenAcceptを使用できます。
handleを使って異常時の処理方法を定義します。この処理も非同期のhandleAsyncです。
whenComplettee 操作には2つのパラメータが必要です。1番目のパラメータは戻り値の2番目の異常情報です。
呼び出しで キャンセルしたり、使ったりします。 compleete Exceptionally(new CancellationException()
appyToEitherは実行が一つであれば終了します。同時に実行が完了したら、現在のタスクを使って結果を返します。appyToEitherメソッドで指定されたタスクの返却結果ではありません。
acceptEitherと appyToEither 違いは前者の戻り値の種類がないことです。
runAfterBothが戻り値なしで現在のタスクと指定されたタスクが完了した時にのみ指定操作が実行されます。
thenAcctBoth 戻り値なしで、この方法は、2つのタスクの戻り値を指定動作のパラメータとします。
thenComponeは戻り値があります。 ,この方法は、2つのタスクの戻り値を指定動作のパラメータとします。
thenCompose 戻り値があります ,この方法は、現在のジョブの戻り値を指定動作のパラメータとします。
anyOfは戻り値があり、タスクが完了したら(異常を含む)全体のタスクが完了します。
allOf すべてのタスクが実行中に完了しました。すべてのタスクが同期動作である場合 allOfで指定された動作も同期 である。すべてのタスクが非同期である場合 allOfで指定された操作も非同期の です。
非同期をテストしています allOf JVMはタスクの非同期を最適化することができます。
タスクの実行時間が短く、現在のスレッドが忙しくない場合、タスクは同期して実行されます。
テストコードを貼り付けます。 みんなも同じ結果かどうか分かりません。タスクに遅延がない タスク遅延0.001 s
1.完成したCompleable Futureを作成する
この場合は通常計算の開始段階で使用します。
CompletableFuture cf = CompletableFuture.completedFuture("message"); //
assertTrue(cf.isDone()); //
assertEquals("message", cf.getNow(null)); //
cf.get Now(null)タスクが完了すると戻り値が得られ、呼び出しが完了していない場合は設定のデフォルト値(null)に戻ります。
2.同期タスクを実行する
この方法は新しいCopletion Stageを返し、操作関数の戻り値を汎型パラメータとして返します。
long main_thread_id = Thread.currentThread().getId();
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertEquals(main_thread_id, Thread.currentThread().getId());
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase(); // String String
});
assertEquals("MESSAGE", cf.getNow(null));
3.非同期タスクを実行するthenAppley Ayncと thenAppplyの違いは前者が非同期で任務を遂行することです。
Compleable Futureの方法の中でAyncのサフィックスのは全部非同期です。
Compleable Futureは、非同期でタスクを実行する場合、Exectorを指定しない場合、非同期で実行します。 Fork JoinPool 実現する
非同期操作の時に、私達は使います。 ジョン 任務の完成を待つ
long main_thread_id = Thread.currentThread().getId();
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertNotEquals(main_thread_id, Thread.currentThread().getId()); // id
// ForkJoinPool ,
assertTrue(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
assertNull(cf.getNow(null)); // null
assertEquals("MESSAGE", cf.join()); // join
4.Exectorを使って非同期的にタスクを実行するタスクの実行方法は、指定されたExectorを使って実行できます。
ExecutorService executorService = Executors.newFixedThreadPool(3, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "HHH");
}
});
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertEquals("HHH", Thread.currentThread().getName()); //
assertFalse(Thread.currentThread().isDaemon()); //
return s.toUpperCase();
}, executorService); // executorService
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join()); // join
5.消費移転次の段階で現在の段階の結果を受信しました。戻り値を使用しない場合は、thenAcceptを使用できます。
StringBuffer result = new StringBuffer();
//
CompletableFuture cf = CompletableFuture.completedFuture("message").thenAcceptAsync(result::append);
assertEquals(" result 0", 0, result.length());
cf.join(); //
assertEquals(" result 7", 7, result.length());
6.異常処理handleを使って異常時の処理方法を定義します。この処理も非同期のhandleAsyncです。
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
int a = 1 / 0; //
return s;
});
// CompletableFuture cf.completeExceptionally(e)
//
CompletableFuture exceptionHandler = cf.handle((s, th) -> (th != null) ? " " : "");
try {
cf.join();
fail(" ");
} catch (CompletionException ex) { // just for testing
assertEquals("/ by zero", ex.getCause().getMessage()); // 0
}
assertEquals(" ", exceptionHandler.join());
私達もwhen Completteにいます。 異常を処理するには、この処理も非同期の可能性があります。whenComplettee 操作には2つのパラメータが必要です。1番目のパラメータは戻り値の2番目の異常情報です。
CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
int a = 1 / 0; //
return s;
}).whenComplete((v,th)->{
assertNotEquals(null,th); //
});
CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase).whenComplete((v,th)->{
assertNull(th); //
assertEquals("MESSAGE",v);
});
7.ジョブのキャンセル呼び出しで キャンセルしたり、使ったりします。 compleete Exceptionally(new CancellationException()
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
fail(" ");
return s;
});
CompletableFuture cf2 = cf.exceptionally(e -> " ");
cf.cancel(true); //
//cf.completeExceptionally(new CancellationException()); //
assertTrue(cf.isCompletedExceptionally()); //
assertEquals(" ", cf2.join());
8.appyToEitherappyToEitherは実行が一つであれば終了します。同時に実行が完了したら、現在のタスクを使って結果を返します。appyToEitherメソッドで指定されたタスクの返却結果ではありません。
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
try {
Thread.sleep(1000); //
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" ");
return s.toUpperCase();
});
/*
applyToEither
*/
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
System.out.println(" ");
return s.toLowerCase();
}),
s -> s + " from applyToEither");
String result = cf2.join();
System.out.println(result);
assertTrue(result.endsWith(" from applyToEither"));
9. acceptEitheracceptEitherと appyToEither 違いは前者の戻り値の種類がないことです。
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
System.out.println(" ");
return s.toUpperCase();
})
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
System.out.println(" ");
return s.toLowerCase();
}),
s -> result.append(s).append("from acceptEither"));
cf.join();
assertTrue(result.toString().endsWith("from acceptEither"));
10.runAfterBothrunAfterBothが戻り値なしで現在のタスクと指定されたタスクが完了した時にのみ指定操作が実行されます。
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" ");
return s.toUpperCase();
}).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(s -> {
System.out.println(" ");
return s.toLowerCase();
}),
() -> result.append("done"));
System.out.println(original);
assertTrue("Result was empty", result.length() > 0);
11.thenAcctBoththenAcctBoth 戻り値なしで、この方法は、2つのタスクの戻り値を指定動作のパラメータとします。
String original = "Message";
StringBuffer res = new StringBuffer();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s, s2) -> {
res.append(s).append(s2);
}
);
assertEquals("MESSAGEmessage", res.toString());
12 thenComponethenComponeは戻り値があります。 ,この方法は、2つのタスクの戻り値を指定動作のパラメータとします。
String original = "Message";
CompletableFuture res = CompletableFuture.completedFuture(original)
.thenApply(String::toUpperCase)
.thenCombine(
CompletableFuture.completedFuture(original)
.thenApply(String::toLowerCase),
(s, s2) -> s + s2);
assertEquals("MESSAGEmessage", res.getNow(null));
13.thenComposethenCompose 戻り値があります ,この方法は、現在のジョブの戻り値を指定動作のパラメータとします。
String original = "Message";
CompletableFuture res = CompletableFuture.completedFuture(original)
.thenApply(String::toUpperCase)
.thenCompose(
s -> CompletableFuture.completedFuture(original)
.thenApply(String::toLowerCase)
.thenApply(s2 -> s + s2));
assertEquals("MESSAGEmessage", res.getNow(null));
14.anyOfanyOfは戻り値があり、タスクが完了したら(異常を含む)全体のタスクが完了します。
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
CompletableFuture.anyOf(messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(String::toUpperCase))
.toArray(CompletableFuture[]::new))
.whenComplete((res, th) -> {
if (th == null) {
assertTrue(isUpperCase(((String) res).charAt(0)));
result.append(res);
}
});
assertEquals(result.length(), 1);
StringBuilder result_except = new StringBuilder();
CompletableFuture> cf = CompletableFuture.anyOf(messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(s -> {
if (s.equals("a")) {
int a = 1 / 0;
}
return s.toUpperCase();
}))
.toArray(CompletableFuture>[]::new))
.whenComplete((res, th) -> {
if (th == null) {
assertFalse(isUpperCase(((String) res).charAt(0)));
result_except.append(res);
}
});
CompletableFuture exceptionHandler = cf.handle((s, th) -> (th != null) ? " " : "");
assertEquals(result_except.length(), 0);
assertEquals(exceptionHandler.join(), " ");
15.allOf allOf すべてのタスクが実行中に完了しました。
List messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(String::toUpperCase))
.collect(Collectors.toList());
long main_id = Thread.currentThread().getId();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
assertEquals(main_id,Thread.currentThread().getId());
if (th == null) { //
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null).charAt(0))));
result.append("done");
}
});
assertEquals("done", result.toString());
List messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(s -> {
try {
Thread.sleep(1000); // allof
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}))
.collect(Collectors.toList());
CompletableFuture cf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
System.out.println(Thread.currentThread().getName());
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
assertEquals(0, result.length());
cf.join();
assertEquals("done", result.toString());
スレ主の質問非同期をテストしています allOf JVMはタスクの非同期を最適化することができます。
タスクの実行時間が短く、現在のスレッドが忙しくない場合、タスクは同期して実行されます。
テストコードを貼り付けます。 みんなも同じ結果かどうか分かりません。
//
public void test18() {
int count = 0; //
for (int i = 0; i < 1000; i++) {
if (test_allof_no_dealy()) count++;
}
System.out.println(" :" + count);
}
public boolean test_allof_no_dealy() {
List messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(String::toUpperCase))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
assertFalse(Thread.currentThread().isDaemon()); //
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
if (result.length() == 0) return true; //
return false; //
}
複数実行の結果: 非同期が発生した回数:512 非同期の出現回数:483// 0.001s
public void test19() {
int count = 0; //
for (int i = 0; i < 1000; i++) {
if (test_allof_dealy()) count++;
}
System.out.println(" :" + count);
}
public boolean test_allof_dealy() {
List messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(s -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}))
.collect(Collectors.toList());
CompletableFuture cf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
if (result.length() == 0) return true; //
return false; //
}
複数実行の結果: 非同期が発生した回数:1000 非同期が発生した回数:1000