シリーズの同時プログラミングをマスターする-10.Fork/Joinフレームワーク

4954 ワード

高同時性、高可用性アーキテクチャの把握
第二課同時プログラミング
この授業から同時プログラミングの内容を学びます.主に、同時プログラミングの基礎知識、ロック、メモリモデル、スレッドプール、各種同時コンテナの使用について説明します.
第十節Fork/JoinフレームワークFork/Join ForkJoinPool
基本思想.ThreadPoolExecutorスレッドプール内の各タスクは、単一のスレッドによって独立して処理されます.非常に時間のかかるタスクが発生すると、スレッドプールに1つのスレッドだけがこの大きなタスクを処理していますが、他のスレッドは空いています.これはCPU負荷のバランシングを引き起こす.ForkJoinPoolで、1つの大きなタスクを複数の小さなタスクに分割し、forkを使用して他のスレッドに小さなタスクを配布して同時に処理し、joinを使用して小さなタスクの実行結果をまとめます.マルチプロセッサの利点を利用して、使用可能なすべての処理能力を集中して実行効率を向上させることが、分而治の考え方の並列実現です.
きほんげんりForkJoinPoolExecutorServiceインタフェースの実装クラスである.ForkJoinPoolの2つのコアは、ワーク・スティリンク・アルゴリズムとを区別して処理することです.
さぎょう盗難アルゴリズム
アルゴリズムの考え方:
  • 各スレッドには独自のWorkQueueがあり、両端キュー
  • です.
  • キューは3つの機能をサポートし、push、pop、poll
  • push/popはキューの所有者スレッドでのみ使用でき、pollは他のスレッドで
  • を呼び出すことができる.
  • で区切られたサブタスクがforkを呼び出すと、スレッドのキューに
  • がpushされます.
  • デフォルトでは、スレッドは自分のキューからタスクを取得し、
  • を実行する.
  • 自分のキューが空である場合、poll盗みタスク
  • をランダムに別のスレッドのキューの末尾から呼び出す.
    ForkJoinPoolオブジェクトの作成
  • 呼び出しExecutorsツールクラス
  • // parallelism      
    public static ExecutorService newWorkStealingPool(int parallelism);
    // Runtime.getRuntime().availableProcessors()     
    public static ExecutorService newWorkStealingPool();
  • ForkJoinPool内部メソッドcommonPool()
  • を呼び出す
    public static ForkJoinPool commonPool();
  • 呼び出しコンストラクタ
  • ForkJoinTask
    ほとんどの場合、私たちはForkJoinTaskからFormJoinPoolに提出しています.
    以下はForkJoinTaskの3つのコアメソッドです.
  • fork()は、大きなタスクが小さなタスクに分割された後、小さなタスクを呼び出すfork()メソッドは、スレッドプールにタスクを入れることができる
  • である.
  • join()は、小さなタスクのjoin()メソッドを呼び出してタスクの戻り結果を待つ.サブタスクが異常を投げ出すと、joinも異常を投げ出す.メソッドquietlyJoin()は、異常を投げ出すも結果を返さず、getException()およびgetResult()
  • を呼び出す必要がある.
  • invoke()は、現在のスレッドでタスク
  • を同期実行する.RecursiveActionおよびRecursiveTask通常、ForkJoinTaskを直接使用するのではなく、2つの抽象クラスを使用します.
  • RecursiveAction:戻り値のないタスク
  • RecursiveTask:戻り値のあるタスク
  • public class RecursiveActionTeset {
    
        static class Sorter extends RecursiveAction {
    
            public static void sort(long[] array) {
                ForkJoinPool.commonPool().invoke(new Sorter(array, 0, array.length));
            }
    
            private final long[] array;
            private final int lo, hi;
    
            public Sorter(long[] array, int lo, int hi) {
                this.array = array;
                this.lo = lo;
                this.hi = hi;
            }
    
            private static final int THRESHOLD = 1000;
    
            //         
            @Override
            protected void compute() {
                if (hi - lo < 1000) {
                    Arrays.sort(array, lo, hi);
                } else {
                    int mid = (hi + lo) >>> 1;
                    //     1000 ,        
                    Sorter left = new Sorter(array, lo, mid);
                    Sorter right = new Sorter(array, mid, hi);
                    invokeAll(left, right);
    
                    merge(lo, mid, hi);
                }
            }
    
            private void merge(int lo, int mid, int hi) {
                long[] buff = Arrays.copyOfRange(array, lo, mid);
                for (int i = 0, j = lo, k = mid; i < buff.length; i++) {
                    if (k == hi || buff[i] < array[k]) {
                        array[j] = buff[i++];
                    } else {
                        array[j] = array[k++];
                    }
                }
            }
    
            public static void main(String[] args) {
                long[] array = new Random().longs(100_0000).toArray();
                Sorter.sort(array);
                System.out.println(Arrays.toString(array));
            }
        }
    }
    public class BatchInsertTask extends RecursiveTask {
        //      
        List records;
    
        public BatchInsertTask(List records) {
            this.records = records;
        }
    
        @Override
        protected Integer compute() {
            //         5,     
            if (records.size() < 5) {
                return computeDirectly();
            } else {
                //            5,       
                int size = records.size();
    
                //     
                BatchInsertTask aTask = new BatchInsertTask(records.subList(0, size / 2));
                //     
                BatchInsertTask bTask = new BatchInsertTask(records.subList(size / 2, records.size()));
                //          
                invokeAll(aTask, bTask);
                //             
                return aTask.join() + bTask.join();
            }
        }
    
        /**
         *          
         */
        private int computeDirectly() {
            try {
                Thread.sleep((long) (records.size() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("   :" + Arrays.toString(records.toArray()));
            return records.size();
        }
    }