4.3 ForkJoin_集約タスクの結果

3936 ワード

ForkJoin集約タスクの結果
概要
タスクが解決すべき問題が所定のサイズより大きい場合は、この問題を複数のサブタスクに分割し、Fork/Joinを使用してこれらのサブタスクを実行する必要があります.実行が完了すると、元のスレッドは、これらのサブタスクによって生成された結果を取得し、結果を集計します.前の章では、サブタスクに分割してFork/Joinを使用して実行する方法について説明しました.この章では、サブタスクの結果をマージする方法について説明します.
このセクションでは、すべての製品の価格合計を計算する例を説明します.1つの計算タスクで製品の価格合計を計算し、タスクが製品の数を10より多く計算する必要がある場合は、これらの要素を2つの部分に分割し、invokeAllメソッドを呼び出して分割されたタスクを実行し、RecursiveTaskのgetメソッドを呼び出して分割タスクの計算結果を取得し、タスクの計算結果を加算して返します.タスクで計算する製品の数が10未満の場合は、直接価格の合計を計算して返します.
サンプルコードは次のとおりです.
public class JoinDemo {
    public static void main(String[] args){
        //      
        System.out.println("main:      ");
        List products = new ArrayList(50000);
        double value = 0;
        for(int i=0; i<50000; i++){
            Product p = new Product();
            double price = (100*Math.random());
            value += price;
            p.setPrice(price);
            products.add(p);
        }
        System.out.println("main:        :" + value);
        //  TaskDemo         
        SumPriceTask task = new SumPriceTask(products,0,products.size());

        //  ForkJoinPool  ,   
        System.out.println("main:  ForkJoinPool  ,   ");
        ForkJoinPool pool = new ForkJoinPool();
        pool.execute(task);

        System.out.println("main:       ");
        do{
            System.out.println("main:    :" + pool.getActiveThreadCount());
            System.out.println("main:    :" + pool.getStealCount());
            System.out.println("main:    :" + pool.getParallelism());

            try {
                TimeUnit.MICROSECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }while(!task.isDone());

        //     
        System.out.println("main:     ");
        pool.shutdown();
        //        
        try {
            pool.awaitTermination(1,TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //      
        try {
            double sum = task.get();
            System.out.println("main:         :" + sum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("main:  ");
    }
}

class SumPriceTask extends RecursiveTask{

    private List products;
    private int first;
    private int last;

    SumPriceTask(List products, int first, int last) {
        this.products = products;
        this.first = first;
        this.last = last;
    }

    @Override
    protected Double compute() {
        if(last-first < 10){
            return sumPrice();
        }else{
            //    10   ,     
            int middle = (last+first)/2;
            SumPriceTask t1 = new SumPriceTask(products,first,middle+1);
            SumPriceTask t2 = new SumPriceTask(products,middle+1,last);
            invokeAll(t1,t2);
            double result = 0;
            try {
                result = t1.get() + t2.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return result;
        }
    }

    /**        **/
    private double sumPrice(){
        double sum = 0;
        for(int i=first;i

プログラム実行ログは次のとおりです.
main:      
main:        :2495194.7098017232
main:  ForkJoinPool  ,   
main:       
main:    :1
main:    :0
main:    :2
main:    :2
main:    :0
main:    :2
main:    :2
main:    :0
main:    :2
main:    :2
main:    :0
main:    :2
main:     
main:         :2495194.7098017237
main: