並列データ処理とパフォーマンス


概要


前号でJava 8に登場したブルーダを学び、ラムティーを大量に応用したStream APIを学びました.Stream APIは、Pipe構造処理によってデータを収集し、既存の論理を複数行(より高い可読性)に処理できる、容易に実現可能なコードフォーマットを提供する.並列計算(Parallel Method)もサポートしています.並列計算をサポートするParallel()メソッドはThread Poolを共有し、パフォーマンス障害を引き起こす可能性があるため、それを理解して使用することが重要です.今回のリリースでは、パラレルデータ処理とパフォーマンスの学習に時間がかかります.😄

0.Stream並列データ処理


0-1)Parallel演算:前置Java 8 VS Post Java 8


Java 8以前の並列処理方法を使用してThreadを作成し、ExecutorServiceを使用します.次に、ExecutorServiceを使用してサンプルコードを記述します.
Java 8以前のコード
List<Integer> numList = Arrays.asList(new Integer[]{1,2,3,4,5,6,7,8,9,10,11,12});

ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < numList.size(); i++) {
    final int index = i;
    executor.submit(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()
                + ", index=" + index + ", ended at " + new Date());
    });
}
executor.shutdown();
結果
pool-1-thread-1, index=0, ended at Mon Aug 23 16:39:50 KST 2021
pool-1-thread-3, index=2, ended at Mon Aug 23 16:39:50 KST 2021
pool-1-thread-4, index=3, ended at Mon Aug 23 16:39:50 KST 2021
pool-1-thread-2, index=1, ended at Mon Aug 23 16:39:50 KST 2021
pool-1-thread-2, index=6, ended at Mon Aug 23 16:39:56 KST 2021
pool-1-thread-3, index=4, ended at Mon Aug 23 16:39:56 KST 2021
pool-1-thread-4, index=5, ended at Mon Aug 23 16:39:56 KST 2021
pool-1-thread-1, index=7, ended at Mon Aug 23 16:39:56 KST 2021
pool-1-thread-4, index=10, ended at Mon Aug 23 16:40:01 KST 2021
pool-1-thread-3, index=9, ended at Mon Aug 23 16:40:01 KST 2021
pool-1-thread-2, index=8, ended at Mon Aug 23 16:40:01 KST 2021
pool-1-thread-1, index=11, ended at Mon Aug 23 16:40:01 KST 2021
12個のデータを含むリストに対して4個のThreadを用いてfor文を実行した.
Java 8以降のコード
List<Integer> numList = Arrays.asList(new Integer[]{1,2,3,4,5,6,7,8,9,10,11,12});

numList.parallelStream().forEach(index -> {
    System.out.println("Starting " + Thread.currentThread().getName()
            + ", index=" + index + ", " + new Date());
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) { }
});
結果
Starting ForkJoinPool.commonPool-worker-15, index=10, Mon Aug 23 17:22:50 KST 2021
Starting main, index=8, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-11, index=2, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-7, index=7, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-5, index=11, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-9, index=9, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-3, index=4, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-13, index=6, Mon Aug 23 17:22:50 KST 2021
Starting ForkJoinPool.commonPool-worker-15, index=12, Mon Aug 23 17:22:55 KST 2021
Starting ForkJoinPool.commonPool-worker-9, index=5, Mon Aug 23 17:22:55 KST 2021
Starting ForkJoinPool.commonPool-worker-7, index=1, Mon Aug 23 17:22:55 KST 2021
Starting ForkJoinPool.commonPool-worker-3, index=3, Mon Aug 23 17:22:55 KST 2021
その結果、メインスレッドを含む8つのスレッドが実行された.
8スレッドを使用する理由は次のとおりです.
Parallel Streamの内部では、fork-join poolがプロセッサごとにスレッドを使用するように設計されているため、Common fork join poolが使用されます.
筆者の運転環境はm 1 8コアチップであるため,8スレッドが動作していることがわかる.

0-2)Threadサイズ制御:フロントJava 8とリアJava 8


実行サービスThreadサイズの制御方式はJava 8以前の並列処理方式と同じである.
Java 8以前のコード
ExecutorService executor = Executors.newFixedThreadPool(5);
Java 8以降のParallel Streamでは、開発者は2つの方法でThreadのサイズを制御できます.
  • システム属性設定
  • Java 8以降のコード
    1.属性値の設定
    java.util.concurrent.ForkJoinPool.common.平行度のアトリビュート値を6に設定
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","6");
    結果
    Starting ForkJoinPool.commonPool-worker-5, index=4, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-9, index=9, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-3, index=11, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-13, index=12, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-11, index=10, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-7, index=7, Mon Aug 23 17:30:15 KST 2021
    Starting main, index=8, Mon Aug 23 17:30:15 KST 2021
    Starting ForkJoinPool.commonPool-worker-13, index=6, Mon Aug 23 17:30:20 KST 2021
    Starting ForkJoinPool.commonPool-worker-11, index=5, Mon Aug 23 17:30:20 KST 2021
    Starting ForkJoinPool.commonPool-worker-9, index=2, Mon Aug 23 17:30:20 KST 2021
    Starting ForkJoinPool.commonPool-worker-3, index=1, Mon Aug 23 17:30:20 KST 2021
    Starting ForkJoinPool.commonPool-worker-5, index=3, Mon Aug 23 17:30:20 KST 2021
    main Threadを含む6つのスレッドが実行されていることがわかります.
    2.ForkJoinPoolの使用
    ForkJoinPoolを作成する際,ねじの大きさの値を入力し,ねじの大きさを5に設定して実験を行った.
    List<Integer> numList = Arrays.asList(new Integer[]{1,2,3,4,5,6,7,8,9,10,11,12});
    
    ForkJoinPool forkjoinPool = new ForkJoinPool(5);
    forkjoinPool.submit(() -> {
        numList.parallelStream().forEach(index -> {
            System.out.println("Thread : " + Thread.currentThread().getName()
                    + ", index + " + new Date());
            try{
                Thread.sleep(5000);
            } catch (InterruptedException e){
            }
        });
    }).get();
    結果
    Thread : ForkJoinPool-1-worker-7, index + Mon Aug 23 17:34:23 KST 2021
    Thread : ForkJoinPool-1-worker-11, index + Mon Aug 23 17:34:23 KST 2021
    Thread : ForkJoinPool-1-worker-5, index + Mon Aug 23 17:34:23 KST 2021
    Thread : ForkJoinPool-1-worker-3, index + Mon Aug 23 17:34:23 KST 2021
    Thread : ForkJoinPool-1-worker-9, index + Mon Aug 23 17:34:23 KST 2021
    Thread : ForkJoinPool-1-worker-3, index + Mon Aug 23 17:34:29 KST 2021
    Thread : ForkJoinPool-1-worker-7, index + Mon Aug 23 17:34:29 KST 2021
    Thread : ForkJoinPool-1-worker-9, index + Mon Aug 23 17:34:29 KST 2021
    Thread : ForkJoinPool-1-worker-5, index + Mon Aug 23 17:34:29 KST 2021
    Thread : ForkJoinPool-1-worker-11, index + Mon Aug 23 17:34:29 KST 2021
    Thread : ForkJoinPool-1-worker-11, index + Mon Aug 23 17:34:34 KST 2021
    Thread : ForkJoinPool-1-worker-7, index + Mon Aug 23 17:34:34 KST 2021

    1.ForkJoinPoolの動作方法


    Parallel Streamで使用されているForkJoinPoolはJava 7に現れ、forkによって実行する必要があるタスクを分割し、joinによって分割されたタスクをマージした結果です.次の図は,分割‐征服アルゴリズムに類似したForkJoinPoolの挙動を示す.

    ForkJoinPool.class動作方式
    次のコードに示すように、ForkJoinPoolはAbstractExecutorServiceの実施者である.

    ForkJoinPool.class
    ForkJoinPoolは各スレッドにキューがあり、スレッドのTaskが1つもない場合、Task付きスレッドのTaskをインポートして処理することでCPUリソースの浪費を低減し、パフォーマンスを向上させる.

    ForkJoinPool Thread挙動

    2.並列計算の注意事項


    並列計算には多くの要因を考慮する必要がある.
    私たちは2つの主要な考慮事項について議論します.

    2−1)ForkJoinPoolによるTask分割のコストは低くなければならない.


    Parallel Streamは、Spliterator<T> trySplit()を使用してTaskを分割します.この方法では、分割されたデータ構造の分割にかかるコストが低く、並列計算の効果が得られます.
    Spliterator trySplit()メソッド
    public Spliterator<T> trySplit() {
        int lo = index, mid = (lo + fence) >>> 1;
        return (lo >= mid)
               ? null
               : new ArraySpliterator<>(array,
                                        lo, index = mid,
                                        characteristics);
    }
    例えば、分割するデータ構造がArrayやArrayListではなくリンクリストである場合、分割コストを低減し、並列計算の効率を向上させることができる.

    2-2)並列処理のタスクは独立していなければならない。


    この点は、パフォーマンス、速度、精度に関するパフォーマンスの問題を引き起こす可能性があります.たとえば、Streamが「ソート()」メソッドと「区別()」メソッドの中間演算を実行する場合、スレッド間で変数(同期)を共有するため、正確な結果値が得られない場合があります.すなわち、複数のスレッドがある場合は、可変オブジェクトにアクセスしないほうがよいので、バックシュート(:Javaでマルチスレッド環境を開発)を確認してください.🤭

    3.並列演算が必要な場合


    上記の2つの条件に基づいて、
  • Taskを区分するコストは低いはずです.
  • 並列処理のタスクは独立して実行する必要があります.
  • この場合、並列計算はメリットをもたらす可能性があります.ただし、これ以外にもパラレル計算に関する注意事項がたくさんありますので、パラレル計算を使用する場合は、これらの注意事項を考慮して使用してください.
    サンプルコード
    static int cnt = 0;
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
      List<Integer> numList = Arrays.asList(new Integer[]{1,2,3,4,5,6});
    
      ForkJoinPool forkjoinPool = new ForkJoinPool(3);
      forkjoinPool.submit(() -> {
          numList.parallelStream().forEach((index) -> {
              for (int i = 0; i < 10000; i++) {
                  cnt++;
              }
          });
      }).get();
    
      System.out.println(cnt);
    }
    
    グローバル変数cntの値をnumList中のデータ要素の数として増加させ,最後に出力するコードとして3つのThreadを記述した.
    結果
    53631
    スレッドが単一スレッドの場合は60000が正常に出力されますが、3つのスレッドがある場合は、次のように出力の結果が正しくないことがわかります.

    4.回顧


    Parallel演算の場合、並列(並列)はもちろん性能が良いのではないでしょうか.栗を残してくれた勉強だと思います🤨
    個人的に感じたのは、流れが少ないときはやめましょう.
    また、どうしても使用しなければならない場合は、必ず運転速度テストを行ってから使用してください.背中みたいしかし当然の概念に対する批判?!考察できて嬉しいです.

    参考資料


    第7章並列データ処理と性能
    (7)並列データ処理と性能
    Java 8-Streamの並列処理
    Java 8 Parallel Stream、パフォーマンス障害に注意!