JAva同時プログラミング学習6-並列ストリーム


【コンセプト
パラレルストリームとは,コンテンツを複数のデータブロックに分割し,各データブロックを異なるスレッドでそれぞれ処理するストリームである.Java 7の前に、データを並列に処理するのは面倒です.第一に、データを含むデータ構造をいくつかのサブ部分に明確に分ける必要があります.第2に、各サブセクションに独立したスレッドを割り当てる.第三に、適切なタイミングで同期を行い、データ競合による問題を回避し、最後に各サブセクションの結果を統合する.Java 7にはforkjoinフレームワークが導入されていますが、java 8のstreamインタフェースは、データを灰を吹くことなく並列処理することができます.streamインタフェースの背後にはforkjoinフレームワークが使用されています.ただし、シーケンスストリームに対してparallel()を呼び出すことは、ストリーム自体に変化があることを意味しない.内部には実際にbooleanフラグが設定されており、parallel()以降の操作を並列に実行したいことを示しています.同様に、sequential()を使用してパラレルストリームをシーケンスストリームに変更できます.この2つの方法は,制御フローをより細かくすることができる.
eg.java 8でのstreamの使用:
//    
public static long sum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            .reduce(0l,Long::sum);
}

//    
public static long parallelSum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            //       
            .parallel()
            .reduce(0l,Long::sum);
}

【並列流路プールの構成
パラレルストリームの内部にはデフォルトのforkjoinPoolが使用され、デフォルトのスレッド数はプロセッサの数(仮想カーネルを含む)であり、Runtime.getRuntime()を通過する.availableProcessors()を入手します.通過:System.setProperty(「java.util.concurrent.ForkJoinPool.common.parallelism」,「12」)は、スレッドプールのサイズを変更します.
【性能テスト
マルチスレッドがシーケンス実行よりも効率的であることは当然ではありません.次の例を見てみましょう.
public class Exercise {

    public static void main(String[] args) {
        long num = 1000_000_0;

        long st = System.currentTimeMillis();
        System.out.println("iterate  " + sum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("iterate  " + parallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("  " + forSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream  " + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream  " + longStreamSum(num) + ":" +(System.currentTimeMillis() - st));
    }

    //    
    public static long sum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                .reduce(0l,Long::sum);
    }

    //    
    public static long parallelSum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                //       
                .parallel()
                .reduce(0l,Long::sum);
    }

    //    
    public static long forSum(long n){
        long result = 0;
        for(long i = 0 ;i <= n ; i++){
            result += i;
        }
        return result;
    }

    //longStream  
    public static long longStreamParallelSum(long n){
        return LongStream.rangeClosed(1,n)
                .parallel()
                .reduce(0l,Long::sum);
    }

    //longStream    
    public static long longStreamSum(long n){
        return LongStream.rangeClosed(1,n)
                .reduce(0l,Long::sum);
    }
}

パラレル・フローの実行時間は、シーケンス・フローと反復実行よりもずっと長くなります.2つの理由があります.
  • iterate()は箱詰めオブジェクトを生成し、箱を分解して合計しなければならない.
  • iterate()は、この関数を適用するたびに前のアプリケーションの結果に依存するため、複数の独立したブロックに分けて並列に実行することは難しい.数値リストは,帰納プロセスの開始時に準備ができていないため,ストリームを効率的に小さなブロックに分割して並列処理することはできない.しかし、ストリームはパラレル実行としてマークされ、シーケンス実行にオーバーヘッドが増加し、毎回の和を求める操作に新しいスレッドが開きます.

  • 【より的確な方法を用いる
    LongStream.rangeClosed():
        1.     long    ,      
        2.       ,          
        

    このことから,適切なデータ構造の選択は並列化アルゴリズムよりも重要であることが多い.並列には代価がある.パラレル・プロシージャは、ストリームを再帰的に分割し、各サブストリームのオペレーションを異なるスレッドに割り当て、これらのオペレーションの結果を1つの値にマージする必要があります.しかし、マルチコア間でデータを移動するコストは、私たちが想像していたよりも大きいため、再カーネルで並列に実行される作業時間がカーネル間でデータを伝送する時間よりも長いことを保証することが重要です.
    【パラレルフローの適切な使用
    パラレルストリームを誤って使用する主な原因は、使用するアルゴリズムが共有変数の状態を変更することです.共有変数を変更することは同期を意味し、同期メソッドを使用すると並列が意味しないためです.以下に、いくつかの推奨事項を示します.
    1.   ,                       。
    2.     ,    ,         ,java8   LongStream,IntStream,DoubleStream        。
    3.                ,  :limit,findFirst          。
    4.                 ,           ,             ,             。
    5.          。
    6.                  。ArrayList      LinkedList   ,                。  ,range                   。            :
       - ArrayList:  
       - LinkedList: 
       - IntStream :  
       - Stream.iterate: 
       - HashSet: 
       - TreeSet: 
    7.           ,           。
    8.           ,           。
    

    【パラレルを正しく使用
  • 高同時、タスク実行時間の短い業務で、スレッドプールのスレッド数はCPUコア数+1に設定でき、スレッドコンテキストの切り替え
  • を減らすことができる.
  • 合併が高くなく、タスクの実行時間が長い業務は区別しなければならない.
  • 業務時間が長くIO操作に集中している場合、つまりIO密集型の任務であり、IO操作はCPUを占有していないため、すべてのCPUを暇にさせないで、スレッドプールの中のスレッド数を大きくして、CPUにもっと多くの業務
  • を処理させることができる.
  • ビジネス時間が計算操作に集中している場合、つまり計算が密集しているタスクであれば、これは仕方がありません.(1)と同じように、スレッドプール内のスレッド数を少なく設定し、スレッドコンテキストの切り替え
  • を減らします.
  • 同時多発高、業務実行時間長、このタイプの任務を解決する鍵はスレッドプールではなく全体のアーキテクチャの設計にあり、これらの業務の中のいくつかのデータがキャッシュできるかどうかを見るのが第一歩であり、サーバーを増やすのが第二歩であり、スレッドプールの設定については、設定リファレンス(2).最後に、ビジネスの実行時間が長いという問題も、ミドルウェアを使用してタスクを分割およびデカップリングできるかどうかを分析する必要があります.