ExectorServiceを使ったデータの紛失問題を調べる.
35480 ワード
記事の目次.ExectorServiceを用いたデータ損失問題 .
ExectorServiceを使ったデータの紛失問題を調べる.
本記事では、スレッド池を使ってマルチスレッド業務を行うにはどうすればいいかを紹介していますが、スレッド池を使う時には注意すべき点がたくさんあります.ここではスレッド池の油断によるデータ結果が予想に合わない例を紹介します.
ExectorServiceを使ったデータの紛失問題を調べる.
本記事では、スレッド池を使ってマルチスレッド業務を行うにはどうすればいいかを紹介していますが、スレッド池を使う時には注意すべき点がたくさんあります.ここではスレッド池の油断によるデータ結果が予想に合わない例を紹介します.
DataQo dataQo= new DataQo();
List<DataResult> listResult = new ArrayList<>();
//
ExecutorService executorInsert = Executors.newFixedThreadPool(dataList.size());
CountDownLatch latch = new CountDownLatch(dataList.size());
try {
//
List<Future<String>> futureList = new ArrayList<Future<String>>();
for (Data data: dataList) {
dataQo.setRelativeId(data.getId());
Future<String> futureListTop = executorInsert.submit(new CalculateQueryThread(dataQo,latch));
futureList.add(futureListTop);
}
latch.await();
//
executorInsert.shutdown();
if(Detect.notEmpty(futureList)){
for (Future<String> future : futureList) {
if(null != future){
String listResultStr = future.get();
if(Detect.notEmpty(listResultStr)){
List<DataResult> listResultEle = (List<DataResult>) SerializableUtils.UnserializeStringToObject(listResultStr);
if(Detect.notEmpty(listResultEle)){
listResult.addAll(listResultEle);
}
}
}
}
}
}catch (Exception e) {
log.error("QueryData;error-msg:{}", e);
} finally {
//
if(!executorInsert.isShutdown()){
executorInsert.shutdown();
}
}
CalculateCategory TopThread.java/**
*
*
* @author huzekun
*/
public class CalculateQueryThreadextends CalculateThread<List<CmsProgramCategoryRelative>>{
private static final Logger LOGGER = Logger.getLogger(CalculateCategoryTopThread.class);
private BusinessService businessService = BeanFactoryUtils.getInstance("businessService");
protected DataQo dataQo;
protected CountDownLatch latch;
@Override
public String call() throws Exception {
try {
LOGGER.debug("==========CalculateQueryThreadextends ===========");
List<DataResult> listData= businessService .listDataByQo(dataQo);
return SerializableUtils.SerializeObjectToString(listData);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("==========CalculateQueryThreadextends ==========="+e.getMessage());
}finally {
latch.countDown();
}
return null;
}
public CalculateQueryThread() {
super();
}
public CalculateQueryThread(DataQo dataQo, CountDownLatch latch) {
this.dataQo= dataQo;
this.latch = latch;
}
public CalculateQueryThread(BusinessService businessService) {
super();
this.businessService = businessService ;
}
}
ここでまずこのコードを見ます.業務は簡単です.主にマルチスレッドを開いて、パラメータによってデータを要求してからつなぎ合わせます.簡単な業務ですが、多くの落とし穴があります.上には最初のコードを使っていますが、私達は直接運転しています.最後に必要なデータは三つしかありませんでした.実は十個のデータがあります.その後、DEBUG
を通じて調整を行い、予定通り10個のデータを得ました.ここでは、プログラムが直接走る時に、あるスレッドのデータクエリを落としたために、最終的なデータがなくなりましたか?ここではexecutorInsert.isTerminated()
を使ってスレッドグループが全部実行されているかどうかを確認し、スレッド漏れ実行問題を回避しましたが、ここでは各スレッドが実行に成功していることが分かります. DataQo dataQo= new DataQo();
List<DataResult> listResult = new ArrayList<>();
//
ExecutorService executorInsert = Executors.newFixedThreadPool(dataList.size());
CountDownLatch latch = new CountDownLatch(dataList.size());
try {
//
List<Future<String>> futureList = new ArrayList<Future<String>>();
for (Data data: dataList) {
dataQo.setRelativeId(data.getId());
Future<String> futureListTop = executorInsert.submit(new CalculateQueryThread(dataQo,latch));
futureList.add(futureListTop);
}
latch.await();
//
executorInsert.shutdown();
while(true){
if(executorInsert.isTerminated()){
if(Detect.notEmpty(futureList)){
for (Future<String> future : futureList) {
if(null != future){
String listResultStr = future.get();
if(Detect.notEmpty(listResultStr)){
List<DataResult> listResultEle = (List<DataResult>) SerializableUtils.UnserializeStringToObject(listResultStr);
if(Detect.notEmpty(listResultEle)){
listResult.addAll(listResultEle);
}
}
}
}
}
break;
}
Thread.sleep(200);
}
}catch (Exception e) {
log.error("QueryData;error-msg:{}", e);
} finally {
//
if(!executorInsert.isShutdown()){
executorInsert.shutdown();
}
}
スレッド漏れ実行の問題に加えて、ここではスレッド実行トラフィックbusinessService .listDataByQo(dataQo)
にチェックポイントを移動し、ログ印刷を記録パラメータと重要な実行プロセス情報に追加しました.案の定、ここでは現在の業務問題の実行回数が正しいことが分かりますが、何度も実行しているうちにパラメータは確かに同じです.霧は基本的に解けています.問題は伝達パラメータの上にあります.DataQo dataQo= new DataQo()
を作成してパラメータオブジェクトを循環添加スレッドに置きます.スレッドごとに新しいパラメータオブジェクトが独立しています.問題は解決されます. List<DataResult> listResult = new ArrayList<>();
//
ExecutorService executorInsert = Executors.newFixedThreadPool(dataList.size());
CountDownLatch latch = new CountDownLatch(dataList.size());
try {
//
List<Future<String>> futureList = new ArrayList<Future<String>>();
for (Data data: dataList) {
DataQo dataQo= new DataQo();
dataQo.setRelativeId(data.getId());
Future<String> futureListTop = executorInsert.submit(new CalculateQueryThread(dataQo,latch));
futureList.add(futureListTop);
}
//
executorInsert.shutdown();
if(Detect.notEmpty(futureList)){
for (Future<String> future : futureList) {
if(null != future){
String listResultStr = future.get();
if(Detect.notEmpty(listResultStr)){
List<DataResult> listResultEle = (List<DataResult>) SerializableUtils.UnserializeStringToObject(listResultStr);
if(Detect.notEmpty(listResultEle)){
listResult.addAll(listResultEle);
}
}
}
}
}
}catch (Exception e) {
log.error("QueryData;error-msg:{}", e);
} finally {
//
if(!executorInsert.isShutdown()){
executorInsert.shutdown();
}
}
複数のスレッドが同時に実行される場合、異なるreqパラメータを宣言する必要があり、同一のオブジェクトは使用できません.そうでなければ、スレッドが安全でないとデータ共有になります.複数のスレッドは同じメモリに使用されています.つまり、スレッドごとに使用されているreqは最終的に同じです.例えば、上記dataQo.setRelativeId(data.getId())
は[1,2,3,4,5]
を設定し、再宣言がないたびに最後の複数のスレッドが[5,5,5,5,5]
を使用してスレッドを実行し、一つのデータが失われたという仮説を生成する.これにより、マルチスレッドを使用する際には技術面だけでなく、より全面的に検討する必要があり、そうでないとミスをしやすいことがわかります.