JavaバッチExecutorServices/completionService

3560 ワード

サービス側は、複数の計算を同時に行うか、または他のサービスに要求を送信する必要があり、最終的に組立結果が上流に戻る要求を受信することが多い.ここでは、JDKがExcecutorServices/CompletionServicesに関連するいくつかの並列処理スキームを提供することについて説明します.実装するシーンは、リクエストにタイムアウト制限があり、すべての操作が計算された場合、すべてのアセンブリが返されます.そうでなければ、組み立て部分だけが完成した結果です.
1.前提
//   ,sleep               ,        。
public class MyTask implements Callable<Integer> {
    private int id;
    private int time;
    public MyTask(int i, int time) {
        this.id = i;
        this.time = time;
    }
    @Override
    public Integer call() throws Exception {
        Thread.sleep(time);
        return id;
    }
}
//   
ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 
60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));

2.タスクを1つずつ待つ
タスクをコミットした後、結果が戻るのを1つずつ待ちます.
//  150ms  
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
futures.add(threadPool.submit(new MyTask(1, 60)));
futures.add(threadPool.submit(new MyTask(2, 150)));
Integer res = 0;

for (int i = 0; i < futures.size(); ++i) {
try {
Integer tmp = futures.get(i).get(50 + i * 50, TimeUnit.MILLESECONDS);
res += tmp;
} catch (Exception e) {
//nothing
}
}
System.out.println(res);

印刷結果は0で、実際に60 ms目に最初の計算が完了し、最初の結果を返すことができます.これは生産者と消費者が一つ一つ対応し、スケジューリングが不適切だからだ.
3.デカップリング待ち
生産者と消費者のデカップリングを1つのキューで実現し、生産者は結果を1つのキューに配置し、消費者はキューから結果を取り、タスクの提出順序に従って待たず、結果があれば消費する.CompletionServiceはExecutorと非同期キューのパッケージです.
CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
futures.add(service.submit(new MyTask(1, 60)));
futures.add(service.submit(new MyTask(2, 150)));

List<Integer> result = new ArrayList<Integer>(futures.size());
for (int i = 0; i < futures.size(); ++i) {
    Future<Integer> future = service.poll(50 + i * 50, TimeUnit.MILLISECONDS);
    if (null != future){
        result.add(future.get());
    }
}
int res = 0;
for (Integer i : result) {
    res += null == i ? 0 : i;
}
System.out.println(res);

印刷結果は1、つまり最初の計算結果です.誰が先に完成したら誰を取りますか.
4.時間の活用
前の2つのシナリオでは、各タスクに対して一定の待機時間を設定し、両者の和はタイムアウト制限を超えない.これは時間を十分に利用していないが、2つのタスクは互いに影響を及ぼさず、それぞれが利用可能な時間はタイムアウト時間であり、両者の和がタイムアウト時間であるべきではない.最初のタスクに140時間、2番目のタスクに60時間がかかる場合、最初の2つのタスクは2番目のタスクの結果しか得られません.両方ともタイムアウトを150に設定すると、2つの結果が得られます.
List<MyTask> tasks = new ArrayList<MyTask>();
tasks.add(new MyTask(1, 140));
tasks.add(new MyTask(2, 60));
List<Future<Integer>> futures = threadPool.invokeAll(tasks, 150, TimeUnit.MILLISECONDS);

int res = 0;
for (Future<Integer> future : futures) {
    System.out.println("isDone:" + future.isDone());
    System.out.println("isCancel:" + future.isCancelled());
    if (future.isCancelled()) {
        continue;
    }
    res += future.get();
}
System.out.println(res);

印刷結果は3です.上記の方法はExecutorServiceのinvokeAllメソッドを利用しており,このメソッドはすべてのタスクが完了または待機している間に戻り,タイムアウト時にはまだ完了していないタスクがキャンセルされ,タスクがキャンセルされたか否かを判断することでタスクが計算完了したか否かを判断する.
この文章では,結果が互いに影響を及ぼさず,結果があれば返す応用シーンを述べている.結果が前後順にマージされる必要がある場合、または1つの計算が完了するのを待つだけで、シナリオ3は必ずしも必要ありません.