【並行すること】スレッド秩序化神器CompletionService
ある日、製品マネージャーが突然魚を触っているあなたを見つけました.製品:『集約検索機能を追加し、ユーザーが私たちのサイトで商品を検索するとき、私たちはそれぞれA、B、Cの3つのサイトからこの情報を検索して、それから得た結果をユーザーに返します』あなた:『ああ、爬虫類を書いて、3つのサイトからデータを捕まえるのではないでしょうか?』製品:『ふふ、爬虫類は法律違反だ.これはデータ分析と呼ばれている.どうすれば、実現できるだろうか.』あなた:『できる』制品:『はい、明日オンライン』あなた:『..』
Code 1.0
あなたはすぐに開発を完了しました.コードは以下の通りです.
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) {
//
String queryName = "java";
//
long startTime = System.currentTimeMillis();
List result = queryInfoCode1(queryName);
System.out.println(" : " + (System.currentTimeMillis() - startTime));
System.out.println(result);
}
/**
* code 1
*
* @param queryName
* @return
*/
private static List queryInfoCode1(String queryName) {
List resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
コードを実行すると、次のようになります.
: 8512
[webA, webB, webC]
私が行きます.どうやって8秒以上お願いしますか?オンラインになったのに,製品はまだ私を切らない.
debugはコードを見て、要求されたサイトに問題が発生したことを発見しました.
/**
* A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
ウェブサイトA、ウェブサイトBは長年修理していないため、誰もメンテナンスしていないで、インタフェースの応答はとても遅くて、平均応答時間は1つは5秒で、1つは3秒です(ここではsleepシミュレーションを使います).ウェブサイトCの性能は更に良くて、平均応答時間は0.5秒です.私達のプログラムの実行時間はウェブサイトA応答時間+ウェブサイトB応答時間+ウェブサイトC応答時間です.
Code 2.0
はい、問題は分かりました.要求されたサイトが遅すぎるので、どうやって解決しますか.彼らに電話してサイトを最適化して私を登らせることはできません.本では私たちにまず自分から問題を探すように教えてくれた.まず、自分のコードがどこで最適化できるかを見てみましょう.
コードを分析すると、私たちのコードはすべてシリアル化されていて、Aサイトが要求し終わって、Bサイトが要求して、Bサイトが要求してからCサイトが要求されています.急に効率を高める第一の要義を思いつき,コードの並列率を高める.なぜA、B、Cの3つのサイトが一緒に要求するのではなく、1つ1つのシリアル要求が必要なのか、Javaのマルチスレッドは簡単に実現できます.コードは以下の通りです.
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//
String queryName = "java";
//
long startTime = System.currentTimeMillis();
List result = queryInfoCode2(queryName);
System.out.println(" : " + (System.currentTimeMillis() - startTime));
System.out.println(result);
}
/**
* code 1
*
* @param queryName
* @return
*/
private static List queryInfoCode1(String queryName) {
List resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* code 2
*
* @param queryName
* @return
*/
private static List queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List resultList = Lists.newArrayList();
// 3
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// feature
Future webAFuture = pool.submit(() -> searchWebA(queryName));
Future webBFuture = pool.submit(() -> searchWebB(queryName));
Future webCFuture = pool.submit(() -> searchWebC(queryName));
//
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
//
pool.shutdown();
}
return resultList;
}
/**
* A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
ここで重要なコードは次のとおりです.
/**
* code 2
*
* @param queryName
* @return
*/
private static List queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List resultList = Lists.newArrayList();
// 3
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// feature
Future webAFuture = pool.submit(() -> searchWebA(queryName));
Future webBFuture = pool.submit(() -> searchWebB(queryName));
Future webCFuture = pool.submit(() -> searchWebC(queryName));
//
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
//
pool.shutdown();
}
return resultList;
}
リクエストサイトのコードは実際には1行も変わっていませんが、リクエスト方法を呼び出す場所になって、以前シリアルのコードをマルチスレッドの形式に変えて、しかも普通のマルチスレッドの形式ではありません.メインスレッドでスレッドの結果を得るため、Futureの形式を使います.(ここでは、以前の文書【同時性】スレッドを作成する3つの方法を参照することができる).
コードを実行して、効果を見て、結果は以下の通りです.
: 5058
[webA, webB, webC]
ええと、効果は明らかで、8秒余りから5秒余りに下がりましたが、まだ長くて、納得できません.追求のあるプログラマーとして、最適化しなければなりません.分析すると、最初のコードはシリアルで、流れは以下の通りで、総要求時間は3回の要求の総時間長である.
並列化されているため、バケツ効果のように、最も長い時間を決定する要因は、あなたの要求の中で最も時間がかかる操作であり、ここでは時間が5秒の要求Aサイトの操作である.
Code 3.0
ここまで分析すると,ABサイトのリクエスト時間を最適化できない前提では最適化が困難である.しかし、方法はいつも困難よりも多く、私たちは確かに総要求時間を圧縮することはできませんが、ユーザーにもっとよく体験させることができます.ここでは、WebsocketとCompletionServiceの2つの技術を導入する必要があります.ここでwebsocketは、クライアントのアクティブな要求を必要とせず、サービス側のアクティブなメッセージをアクティブにプッシュするサービス側プッシュ技術と簡単に理解できる(wsは本稿では重点ではなく、一筆書きで、具体的な実装は前文【websocket】spring boot統合websocketの4つの方法を参照することができる).
/*
*
* * *
* * * blog.coder4j.cn
* * * Copyright (C) B0A6-B0B0 All Rights Reserved.
* *
*
*/
package cn.coder4j.study.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author buhao
* @version TestCompletionService.java, v 0.A B0B0-0B-A8 A9:0C buhao
*/
public class TestCompletionService {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//
String queryName = "java";
//
long startTime = System.currentTimeMillis();
queryInfoCode3(queryName);
System.out.println(" : " + (System.currentTimeMillis() - startTime));
}
/**
* code 1
*
* @param queryName
* @return
*/
private static List queryInfoCode1(String queryName) {
List resultList = Lists.newArrayList();
String webA = searchWebA(queryName);
resultList.add(webA);
String webB = searchWebB(queryName);
resultList.add(webB);
String webC = searchWebC(queryName);
resultList.add(webC);
return resultList;
}
/**
* code 2
*
* @param queryName
* @return
*/
private static List queryInfoCode2(String queryName) throws ExecutionException, InterruptedException {
List resultList = Lists.newArrayList();
// 3
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
// feature
Future webAFuture = pool.submit(() -> searchWebA(queryName));
Future webBFuture = pool.submit(() -> searchWebB(queryName));
Future webCFuture = pool.submit(() -> searchWebC(queryName));
//
resultList.add(webAFuture.get());
resultList.add(webBFuture.get());
resultList.add(webCFuture.get());
} finally {
//
pool.shutdown();
}
return resultList;
}
/**
* code 3
*
* @param queryName
* @return
*/
private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
//
long startTime = System.currentTimeMillis();
// CompletionService
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
// feature
executorCompletionService.submit(() -> searchWebA(queryName));
executorCompletionService.submit(() -> searchWebB(queryName));
executorCompletionService.submit(() -> searchWebC(queryName));
for (int i = 0; i < 3; i++) {
Future take = executorCompletionService.take();
System.out.println(" -> " + take.get());
System.out.println(" ws , " + (System.currentTimeMillis() - startTime));
}
}
/**
* A
*
* @param name
* @return
*/
public static String searchWebA(String name) {
ThreadUtil.sleep(5000);
return "webA";
}
/**
* B
*
* @param name
* @return
*/
public static String searchWebB(String name) {
ThreadUtil.sleep(3000);
return "webB";
}
/**
* C
*
* @param name
* @return
*/
public static String searchWebC(String name) {
ThreadUtil.sleep(500);
return "webC";
}
}
コアコードは次のとおりです.
/**
* code 3
*
* @param queryName
* @return
*/
private static void queryInfoCode3(String queryName) throws ExecutionException, InterruptedException {
//
long startTime = System.currentTimeMillis();
// CompletionService
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newFixedThreadPool(3));
// feature
executorCompletionService.submit(() -> searchWebA(queryName));
executorCompletionService.submit(() -> searchWebB(queryName));
executorCompletionService.submit(() -> searchWebC(queryName));
for (int i = 0; i < 3; i++) {
Future take = executorCompletionService.take();
System.out.println(" -> " + take.get());
System.out.println(" ws , " + (System.currentTimeMillis() - startTime));
}
}
まず、実行結果を見てください.
-> webC
ws , 561
-> webB
ws , 3055
-> webA
ws , 5060
: 5060
実行結果を分析してみると、まず総時間が5秒以上かかっても変わっていないが、すべて実行してからクライアントにプッシュするのではなく、実行が終わったらプッシュする法則を発見し、最初にプッシュするのは要求が最も速く、それから2番目に速く、最後に最も遅いものである.すなわち,プッシュ結果は秩序化されている.ユーザーに与える体験は、ボタンをクリックすると、1秒以内にサイトCのデータが表示され、2秒後には元のベースにガイドサイトBのデータが追加され、2秒後にはサイトAのデータが追加されます.この体験は、ユーザーが5秒間白画面にしてから、すべてのデータを一気に返すよりもずっと良いです.
不思議なことに、この背後にある功労者はCompletionServiceで、彼のソースは以下の通りです.
package java.util.concurrent;
/**
* A service that decouples the production of new asynchronous tasks
* from the consumption of the results of completed tasks. Producers
* {@code submit} tasks for execution. Consumers {@code take}
* completed tasks and process their results in the order they
* complete. A {@code CompletionService} can for example be used to
* manage asynchronous I/O, in which tasks that perform reads are
* submitted in one part of a program or system, and then acted upon
* in a different part of the program when the reads complete,
* possibly in a different order than they were requested.
*
* Typically, a {@code CompletionService} relies on a separate
* {@link Executor} to actually execute the tasks, in which case the
* {@code CompletionService} only manages an internal completion
* queue. The {@link ExecutorCompletionService} class provides an
* implementation of this approach.
*
*
Memory consistency effects: Actions in a thread prior to
* submitting a task to a {@code CompletionService}
* happen-before
* actions taken by that task, which in turn happen-before
* actions following a successful return from the corresponding {@code take()}.
*/
public interface CompletionService {
/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task. Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future submit(Callable task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
* and whose {@code get()} method will return the given
* result value upon completion
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future submit(Runnable task, V result);
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future take() throws InterruptedException;
/**
* Retrieves and removes the Future representing the next
* completed task, or {@code null} if none are present.
*
* @return the Future representing the next completed task, or
* {@code null} if none are present
*/
Future poll();
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if necessary up to the specified wait
* time if none are yet present.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the Future representing the next completed task or
* {@code null} if the specified waiting time elapses
* before one is present
* @throws InterruptedException if interrupted while waiting
*/
Future poll(long timeout, TimeUnit unit) throws InterruptedException;
}
CompletionServiceメソッドは次のように表示されます.
submitはCallableオブジェクトをコミットし、結果を得るスレッドタスクをコミットするために使用されます.
submitは、上のsubmitと同様にRunnableオブジェクトとresultオブジェクトをコミットするために使用されますが、runnableの戻り値voidではスレッドの結果が得られないため、パラメータとしてのブリッジとしてresultが追加されました.
takeは、最新のスレッド実行結果を取り出すために使用されます.ここではブロックされていることに注意してください.
takeは最新のスレッド実行結果を取り出すために使用され、非ブロックであり、結果がなければnullを返します.
同じように、タイムアウト時間を追加しただけです.
また、CompletionServiceはインタフェースであり、直接使用することはできません.通常、彼の実装クラスExecutorCompletionServiceを使用しています.具体的な使用方法は上記のdemoのようです.
ここを見るとExecutorCompletionServiceの実現原理が気になるかもしれませんが、実は原理は簡単です.彼は内部でブロックキューを維持しています.提出したタスクは、先に実行した先進的なキューに入ります.だから、pollやtakeを通じて得たのは、最初に実行したタスクの結果に違いありません.
その他
1.プロジェクトコード
紙面が限られているため、すべてのコードを貼り終えることができません.問題が発生したら、githubにソースコードを表示します.
について
私の个人の公众号KIWIの砕けた考えに関心を持って、関心を持ってから福祉に返事して、大量の学习の内容は无料で分かち合います!
私の个人の公众号KIWIの砕けた念に関心を持つことを歓迎して、関心を持つ后に学习の资料に返事して、大量の学习の内容は直接分かち合います!