java.util.concurrentマルチスレッド並列処理は処理結果を返す(listセットを計算する)
java.util.concurrentマルチスレッド並列処理は処理結果を返す(listセットを計算する)
通常、主なタスク表現としてRunnableを使用しますが、Runnableは非常に限られた抽象であり、runメソッドではログを記録したり、印刷したりすることができます.あるいは、あるコンテナにデータをまとめて(一方ではメモリ消費量が多く、一方では同期を制御する必要があり、効率が非常に大きいという制約がある)、つまり実行結果を返すことができない;例えば、同じ時間に1000個のタスクがネットワーク上でデータを捕まえて、それから捕まえたデータを処理する(処理方式が不定)、最も良い方法はコールバックインタフェースを提供して、処理の方式を最もコールバックに伝達することです;しかし今私達はもっと良い方法が実現しました:CompletionService+Callable CompletionServiceはExecutor(ラインプール)とBlockingQueue(列を塞ぐ)結合して、同じ時Callableを任務の基本ユニットとして使用して、全体の過程は生産者が絶えずCallable任務を塞ぐことに入れて、Executorは消費者として絶えず任務を取り出して運行して、そして結果に戻ります;優勢:a、列を塞ぐのはメモリの中で列を塞いで待つ任務が多すぎることを防止して、メモリのオーバーフローをもたらしますb、CompletionServiceは、どのタスクが先に実行されるか、順番に戻るのではなく、先に実行されたタスクが戻ってくることで、効率を大幅に向上させることができます.
次のコードはlistコレクション配布マルチスレッドシリアル計算を実現し、計算結果を返します.
通常、主なタスク表現としてRunnableを使用しますが、Runnableは非常に限られた抽象であり、runメソッドではログを記録したり、印刷したりすることができます.あるいは、あるコンテナにデータをまとめて(一方ではメモリ消費量が多く、一方では同期を制御する必要があり、効率が非常に大きいという制約がある)、つまり実行結果を返すことができない;例えば、同じ時間に1000個のタスクがネットワーク上でデータを捕まえて、それから捕まえたデータを処理する(処理方式が不定)、最も良い方法はコールバックインタフェースを提供して、処理の方式を最もコールバックに伝達することです;しかし今私達はもっと良い方法が実現しました:CompletionService+Callable CompletionServiceはExecutor(ラインプール)とBlockingQueue(列を塞ぐ)結合して、同じ時Callableを任務の基本ユニットとして使用して、全体の過程は生産者が絶えずCallable任務を塞ぐことに入れて、Executorは消費者として絶えず任務を取り出して運行して、そして結果に戻ります;優勢:a、列を塞ぐのはメモリの中で列を塞いで待つ任務が多すぎることを防止して、メモリのオーバーフローをもたらしますb、CompletionServiceは、どのタスクが先に実行されるか、順番に戻るのではなく、先に実行されたタスクが戻ってくることで、効率を大幅に向上させることができます.
次のコードはlistコレクション配布マルチスレッドシリアル計算を実現し、計算結果を返します.
package com.project.service.service.impl;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
import java.util.concurrent.*;
public class Test {
/**
* list
*
* @param args
* @throws Exception
*/
public static void main (String[]args) throws Exception{
// list
List list = Lists.newArrayList();
for(int i =1;i<=21;i++){
Student student=new Student(i,"test-"+i);
list.add(student);
}
//
ExecutorService exec = Executors.newFixedThreadPool(10);
CompletionService<List<Student>> cpiService =new ExecutorCompletionService<>(exec);
int f=0;
int i =0 ;
//
while (true){
f=f+1;
int g=(i+10)>list.size()?(list.size()):(i+10);
testCallable callable=new testCallable(f,list.subList(i,g));
if(!exec.isShutdown()){
cpiService.submit(callable);
}
i=(g);
if(i>=(list.size()))
break;
}
System.out.println("f:"+f+";i:"+i+";size:"+list.size());
//
for(int h=0;h<f;h++){
List<Student> students=cpiService.take().get();
for(Student student:students){
System.out.println("result-"+h+"-["+student+"]");
}
}
//
exec.shutdown();
}
@Getter
@Setter
static class Student {
private int id ;
private String name ;
public Student(int id, String name) {
this.id = id;
this.name = name;
}
public Student() {
}
@Override
public String toString() {
return id+"-student{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
//
static class testCallable implements Callable<List<Student>>{
private int flag;
private List<Student> students;
public testCallable(int flag, List<Student> students) {
this.flag = flag;
this.students = students;
}
public testCallable() {
}
@Override
public List<Student> call() throws Exception {
students.stream().forEach(e->printStudent(flag,e));
return students;
}
public void printStudent(int i,Student student ){
System.out.println("call-"+i+"-["+student+"]");
}
}
}