マルチスレッドプログラミング学習9(同時ツールクラス).

6229 ワード

CountDownLatch

  • CountDownLatchでは、1つ以上のスレッドが他のスレッドの完了操作を待つことができます.
  • CountDownLatchはjoinの代わりに機能し、より豊富な使い方を提供します.
  • CountDownLatchのcountDownメソッドは、Nが1減少します.CountDownLatchのawaitメソッドは、Nがゼロになるまで現在のスレッドをブロックします.
  • CountDownLatchオブジェクトの内部カウンタの値を再初期化または変更することはできません.
  • CountDownLatch内部はAQS共有ロックによって実現される.
  • public class CountDownLatchTest {
    
        private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                System.out.println(1);
                DOWN_LATCH.countDown();
                System.out.println(2);
                DOWN_LATCH.countDown();
    
            }).start();
            DOWN_LATCH.await();
            System.out.println("3");
        }
    }

    CyclicBarrier

  • CyclicBarrierは、同期ポイントと呼ばれるバリアを設定し、最後のスレッドがバリアに到達するまでバリアが開き、バリアによってブロックされたすべてのスレッドが動作し続ける.
  • CyclicBarrierのデフォルトの構造方法はCyclicBarrier(int parties)であり、そのパラメータはバリアブロックのスレッド数を表し、各スレッドはawaitメソッドを呼び出してCyclicBarrierにバリアに着いたことを伝え、現在のスレッドはブロックされている.
  • CyclicBarrierはまた、スレッドがバリアに到達したときにbarrier Actionを優先的に実行し、より複雑なビジネスシーンの処理を容易にするためのより高度な構造関数CyclicBarrier(int parties,Runnable barrier Action)を提供する.
  • getNumberWaiting法はCyclicBarrierブロックのスレッド数を得ることができる.isBroken()メソッドは、ブロックされたスレッドが中断されたかどうかを知るために使用されます.
  • CyclicBarrierのカウンタはreset()メソッドを使用してリセットできます(CountDownLatchのカウンタは一度しか使用できません).そのため、CyclicBarrierはより複雑なビジネスシーンを処理することができます.たとえば、計算にエラーが発生した場合は、カウンタをリセットし、スレッドを再実行できます.
  • CyclicBarrierは、マルチスレッド計算データに使用でき、最後に計算結果のシーンをマージします.
  • CyclicBarrier内部は再入ロックReentrantLockで実現されている.
  • public class BankWaterService implements Runnable {
     
        //  4 , run 
        private CyclicBarrier barrier = new CyclicBarrier(4, this);
        //  4 , 4 
        private Executor executor = Executors.newFixedThreadPool(4);
        //  
        private ConcurrentHashMap sheetBankWaterCount = new ConcurrentHashMap<>();
    
        private AtomicInteger atomicInteger = new AtomicInteger(1);
    
        private void count() {
            for (int i = 0; i < 4; i++) {
                Thread thread = new Thread(() -> {
                    //  , 
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    //  , 
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
    
                }, " " + atomicInteger.getAndIncrement());
                executor.execute(thread);
            }
        }
    
        @Override
        public void run() {
            int result = 0;
            //  
            for (Map.Entry sheet : sheetBankWaterCount.entrySet()) {
                result += sheet.getValue();
            }
            // 
            sheetBankWaterCount.put("result", result);
            System.out.println(result);
        }
    
        public static void main(String[] args) {
            BankWaterService bankWaterCount = new BankWaterService();
            bankWaterCount.count();
        }
    }

    Semaphore

  • Semaphore(信号量)は、特定のリソースに同時にアクセスするスレッドの数を制御するために使用され、各スレッドを調整することによって、共通のリソースの合理的な使用を保証する.
  • Semaphoreは、トラフィック制御、特に共通リソースの限られたアプリケーションシーン、例えばデータベース接続に使用することができる.
  • Semaphoreの構築方法Semaphore(int permits)は、使用可能なライセンス数を示す整数の数字を受け入れます.
  • 最初のスレッドは、Semaphoreのacquire()メソッドを使用してライセンスを取得し、使用後にrelease()メソッドを呼び出してライセンスを返します.tryAcquire()メソッドを使用してライセンスの取得を試みることもできます.
  • intavailablePermits():この信号量で現在使用可能なライセンス数を返します.
  • intgetQueueLength():ライセンスの取得を待機しているスレッドの数を返します.
  • booleanhasQueuedThreads():ライセンスの取得を待っているスレッドがあるかどうか.
  • Semaphore内部はAQS共有ロックを用いて実現される.
  • public class SemaphoreTest {
    
        private static final int THREAD_COUNT = 30;
        private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore SEMAPHORE = new Semaphore(10);
        private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);
    
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                EXECUTOR.execute(() -> {
                    try {
                        SEMAPHORE.acquire();
                        System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
                        SEMAPHORE.release();
                    } catch (InterruptedException e) {
                    }
    
                });
            }
            EXECUTOR.shutdown();
        }
    }

    Exchanger

  • Exchanger(エクスチェンジ)は、スレッド間のデータ交換のためのスレッド間連携のためのツールクラスです.2つのスレッドが互いのデータを交換できる同期ポイントを提供します.この2つのスレッドはexchangeメソッドでデータを交換し、最初のスレッドが先にexchange()メソッドを実行すると、2番目のスレッドもexchangeメソッドを実行するのをずっと待っています.
  • は、Exchangerオブジェクトを単純に2つの格子を含むコンテナとして理解することができ、exchanger法により2つの格子に情報を埋め込むことができる.2つの格子の両方が埋め込まれると、オブジェクトは自動的に2つの格子の情報を交換し、スレッドに戻り、2つのスレッドの情報交換を実現します.
  • Exchangerは遺伝アルゴリズムに使用できます.(遺伝アルゴリズム:交配対象として2人を選択する必要がある場合,2人のデータを交換し,交配結果をクロスルールで導く)
  • .
  • Exchangerは、データを2人で同時に照合する必要があるなど、校正作業に使用できます.この場合、Exchangerを使用して2つの校正結果を比較できます.
  • Exchanger内部はロックなしCASで実現され、Exchangeは内部オブジェクトNodeの2つの属性であるitem、matchを使用し、2つのスレッドの値を分散して格納する.
  • public class ExchangerTest {
    
        private static final Exchanger exchange = new Exchanger<>();
        private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
    
        public static void main(String[] args) {
            threadPool.execute(() -> {
                try {
                    String result = exchange.exchange(" A");
                    System.out.println("A exchange :" + result);
                } catch (InterruptedException e) {
                }
    
            });
            threadPool.execute(() -> {
                try {
                    String result = exchange.exchange(" B");
                    System.out.println("B exchange :" + result);
                } catch (InterruptedException e) {
                }
            });
            threadPool.shutdown();
        }
    }