JAvaコンカレントツールクラス:CountDownLatch、CyclicBarrier、Exchanger

6000 ワード

引用する


jdk1.5以降、java concurrentパッケージにはいくつかの同時ツールクラスが用意されています.この文書では、これらのツールクラスの使い方と使用シーンを整理します.
  • CountDownLatch:1つ以上のスレッドがブロックされ、別のスレッドが特定の操作を完了するまで待機します.
  • CyclicBarrier:複数のスレッドがブロックされて待機し、すべてのスレッドが障害点(barrier)に達するまで機能的にCountDownLatchに近い.最大の違いはCountDownLatchのカウントダウンが1回しか有効にならないことであり、CyclicBarrierはリサイクルできる.
  • Exchanger:2つのスレッドの同期器に対して、このスレッドのデータ交換を許可します.

  • CountDownLatchの使い方


    CountDownLatchを初期化するには、特定のcount値を指定する必要があります.awaitメソッドは、count値が0になるまでスレッドをブロックし、countDownメソッドを呼び出すとcount値を減らすことができます.CountDownLatchの一般的な使用法の1つは、メインスレッドで作業スレッドの実行を開始し、作業スレッドの実行が終了するのを待つことである.
    public class CountDownLatch1 {
        public static void main(String[] args) {
            try {
                CountDownLatch startSignal = new CountDownLatch(1);
                CountDownLatch doneSignal = new CountDownLatch(5);
                for (int i = 0; i < 5; ++i) {
                    new Thread(new Worker(startSignal, doneSignal)).start();
                }
                //  
                System.out.println(" ...");
                startSignal.countDown();
                //  
                System.out.println(" ...");
                //  
                doneSignal.await();
                System.out.println(" ...");
            } catch (InterruptedException ex) {
            }
        }
    }
    class Worker implements Runnable {
        private final CountDownLatch startSignal;
        private final CountDownLatch doneSignal;
        Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
        @Override
        public void run() {
            try {
                startSignal.await();
                //  
                Thread.sleep(100);
            } catch (InterruptedException ex) {
            } finally {
                System.out.println(Thread.currentThread().getName() + " ");
                doneSignal.countDown();
            }
        }
    }
    

    作業スレッドの実行開始...他のプロセスを実行...Thread-1実行終了Thread-3実行終了Thread-2実行終了Thread-4実行終了Thread-0実行終了ワークスレッド終了...
    もう1つの方法は、1つの大きなタスクを複数のサブタスクに分割することです.各サブタスクは新しいスレッドで実行され、すべてのサブスレッドが実行されると、大きなタスクの実行が終了することを意味します.この方式は分治法の実現に非常に適しており,各サブタスク間に依存関係はない.
    public class CountDownLatch2 {
        public static void main(String[] args) {
            try {
                List numberList = Arrays.asList(1,2,3,4,5);
                CountDownLatch doneSignal = new CountDownLatch(5);
                Executor e = Executors.newFixedThreadPool(5);
                for (int i = 0; i < 5; ++i) {
                    e.execute(new WorkerRunnable(doneSignal, i, numberList));
                }
                doneSignal.await();
                System.out.println(" ");
            } catch (InterruptedException ex) {
            }
        }
    }
    class WorkerRunnable implements Runnable {
        private final CountDownLatch doneSignal;
        private final int i;
        private final List numberList;
        WorkerRunnable(CountDownLatch doneSignal, int i, List numberList) {
            this.doneSignal = doneSignal;
            this.i = i;
            this.numberList = numberList;
        }
        @Override
        public void run() {
            doWork(i);
            doneSignal.countDown();
        }
    
        void doWork(int i) {
            System.out.println(Thread.currentThread().getName() + " " + numberList.get(i));
        }
    }
    

    pool-1-thread-1印刷1 pool-1-thread-5印刷5 pool-1-thread-4印刷4 pool-1-thread-3印刷3 pool-1-thread-2印刷2ワークスレッド実行終了

    CyclicBarrierの使い方


    CyclicBarrierは、スレッドがあり、互いに待つ必要があるシーンに非常に適しています.CyclicBarrierは、文字通り、ループバリアを意味し、ここでのバリアは、スレッドがawaitメソッドを呼び出すことによってブロック待ちをもたらすことを意味する.CyclicBarrierには2つのコンストラクタ方法があります.
  • CyclicBarrier(int parties):partiesは、このバリア上でawaitメソッドを呼び出してバリアに到達するスレッドがどれだけ必要かを指定します.
  • CyclicBarrier(int parties,Runnable barrierAction):この構築方法は、前のRunnableタイプのパラメータよりも多く、最後のスレッドがバリアに到達すると、このスレッドタスクを実行します.このスレッドタスクにより、すべてのスレッドが到着したときにステータス変更などの機能を完了できます.
  • class Solver {
        final int N;    // 
        final float[][] data;    // 
        final CyclicBarrier barrier;    // 
    
        class Worker implements Runnable {
            int myRow;
            Worker(int row) { myRow = row; }
            public void run() {
                while (!done()) {
                    processRow(myRow);    // 
                    try {
                        barrier.await();     // 
                    } catch (InterruptedException ex) { 
                        return; 
                    } catch (BrokenBarrierException ex) {
                        return; 
                    }
                }
            }
        }
    
        public Solver(float[][] matrix) {
            data = matrix;
            N = matrix.length;
            // CyclicBarrier
            barrier = new CyclicBarrier(N, new Runnable() {
                                               public void run() { 
                                                 mergeRows(); // 
                                               }
                                           });
            for (int i = 0; i < N; ++i) 
                new Thread(new Worker(i)).start();
            waitUntilDone();
        }
    }
    

    Exchangerの使い方

    public class ExchangerDemo {
        public static void main(String[] args) {
            Exchanger exchanger = new Exchanger();
            Thread thread1 = new Thread(new Worker(exchanger, "thread1"));
            Thread thread2 = new Thread(new Worker(exchanger, "thread2"));
            thread1.setName("thread1");
            thread2.setName("thread2");
            thread1.start();
            thread2.start();
        }
    }
    class Worker implements Runnable {
        private String value;
        private final Exchanger exchanger;
        public Worker(Exchanger exchanger, String value) {
            this.exchanger = exchanger;
            this.value = value;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(value));
            } catch (InterruptedException ex) {
            }
        }
    }