Javaコンカレント-スレッド同期クラス


译文1著者:インターコンチネンタル1984
原文アドレス:Java並発注中の高度な同期ツール
Javaの同時発注とは、Java.util.concurrent(略称:JUC)パッケージとそのサブパッケージの下にあるクラスとインタフェースを指し、Javaの同時実行にさまざまな機能サポートを提供します.たとえば、次のようなものです.
  • はスレッドプールの作成クラスThreadPoolExecutor、Executorsなどを提供する.
  • は、ロック、ReentrantLockなどの様々なロックを提供している.
  • は、ConcurrentHashMap、LinkedBlockingQueue、DelayQueueなどの様々なスレッドの安全なデータ構造を提供する.
  • は、CountDownLatch、CyclicBarrier、Semaphore、Phaserなどのより高度なスレッド同期構造を提供する.

  • 前の章ではスレッドプールの使用,スレッドセキュリティのデータ構造などについて詳しく説明したが,Javaおよび発注におけるより高度なスレッド同期クラス:CountDownLatch,CyclicBarrier,Semaphore,Phaserなどを重点的に学習する.
    一、CountDownLatchの紹介と使用
    CountDownLatch(閉鎖)は、減算しかできないカウンタと見なすことができ、1つ以上のスレッドがキュー実行を待つことができる.CountDownLatchには2つの重要な方法があります.
  • countDown():カウンタを1減少させる;
  • await():カウンタが0でない場合、このメソッドを呼び出すスレッドはブロックされ、カウンタが0である場合、待機している1つまたはすべてのスレッドを起動することができる.

  • CountDownLatch使用シーン:生活の中の情景を例にとると、例えば病院に行って健康診断を受けると、通常人々は早めに病院に行って列を作っていますが、医者が出勤してから、正式に健康診断を始めることができます.医者もすべての人に健康診断をしてから退勤することができます.このような場合はCountDownLatchを使います.流れは:患者が列に並ぶ→医者が出勤する→健康診断が完了する→医者が退勤する.
    CountDownLatchのサンプルコードは次のとおりです.
    //     
    CountDownLatch hospitalLatch = new CountDownLatch(1);
    //     
    CountDownLatch patientLatch = new CountDownLatch(5);
    System.out.println("    ");
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        final int j = i;
        executorService.execute(() -> {
            try {
                hospitalLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("  :" + j);
            patientLatch.countDown();
        });
    }
    System.out.println("    ");
    hospitalLatch.countDown();
    patientLatch.await();
    System.out.println("    ");
    executorService.shutdown();
    
    

    以上のプログラムの実行結果は以下の通りです.
    患者が並ぶ
    医者が出勤する
    健康診断:4
    健康診断:0
    健康診断:1
    健康診断:3
    健康診断:2
    医者が退社する
    二、CyclicBarrierの紹介と使用
    CyclicBarrier(サイクルバリア)は、ある条件を満たすまでスレッドのセットを待機させながら実行することができる.CyclicBarrierのクラシックな使用シーンはバスが発車するので、私たちの定義を簡単に理解するために、各バスは4人でいっぱいになると発車し、後ろから来た人は順番に並んで相応の基準に従います.
    その構造方法は  CyclicBarrier(int parties,Runnable barrierAction)  ここでpartiesは、待機に関与するスレッドがいくつかあることを示し、barrierActionは、条件を満たした後にトリガーされる方法を示す.CyclicBarrierはawait()メソッドを使用して、現在のスレッドがバリアポイントに到達し、ブロックされたことを識別します.
    CyclicBarrierのサンプルコードは次のとおりです.
    import java.util.concurrent.*;
    public class CyclicBarrierTest {
        public static void main(String[] args) throws InterruptedException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    System.out.println("   ");
                }
            });
            for (int i = 0; i < 4; i++) {
                new Thread(new CyclicWorker(cyclicBarrier)).start();
            }
        }
        static class CyclicWorker implements Runnable {
            private CyclicBarrier cyclicBarrier;
            CyclicWorker(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
            @Override
            public void run() {
                for (int i = 0; i < 2; i++) {
                    System.out.println("  :" + i);
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    

    以上のプログラムの実行結果は以下の通りです.
    乗客:0
    乗客:0
    乗客:0
    乗客:0
    発車した
    乗客:1
    乗客:1
    乗客:1
    乗客:1
    発車した
    三、Semaphoreの紹介と使用
    Semaphore(信号量)は、マルチスレッドにおける制御リソースのアクセスと使用を管理するために使用される.Semaphoreは駐車場の警備員のようなもので、駐車スペースの使用資源を制御することができます.如来より5台、2台しかないので、警備員は先に2台の車を入れて入ることができて、車が出てから、後ろの車を入れることができます.
    Semaphoreのサンプルコードは次のとおりです.
    Semaphore semaphore = new Semaphore(2);
    ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    for (int i = 0; i < 5; i++) {
        semaphoreThread.execute(() -> {
            try {
                //       
                semaphore.acquire();
                System.out.println("Thread:" + Thread.currentThread().getName() + "   :" + LocalDateTime.now());
                TimeUnit.SECONDS.sleep(2);
                //     
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    
    

    以上のプログラムの実行結果は以下の通りです.
    Thread:pool-1-thread-1時間:2019-07-10 21:18:42
    Thread:pool-1-thread-2時間:2019-07-10 21:18:42
    Thread:pool-1-thread-3時間:2019-07-10 21:18:44
    Thread:pool-1-thread-4時間:2019-07-10 21:18:44
    Thread:pool-1-thread-5時間:2019-07-10 21:18:46
    四、Phaser(位相シフタ)
    JDK 7が提供する機能は、すべてのスレッドが到着するのを待ってから、新しいタスクのセットを続行または開始することです.例えば旅行団があって、私達はすべてのメンバーがすべて指定の場所に到着した後に、やっと発車して観光地の1に行くことができて、観光地に到着した後にそれぞれ遊ぶことができて、その後すべて指定の場所に到着しなければならなくて、やっと発車して次の観光地に行くことができて、このようなシーンはPhaserを使うのにとても適しています.
    Phaserのサンプルコードは次のとおりです.
    public class Lesson5\_6 {
        public static void main(String[] args) throws InterruptedException {
            Phaser phaser = new MyPhaser();
            PhaserWorker[] phaserWorkers = new PhaserWorker[5];
            for (int i = 0; i < phaserWorkers.length; i++) {
                phaserWorkers[i] = new PhaserWorker(phaser);
                //    Phaser       ,          +1
                phaser.register();
            }
            for (int i = 0; i < phaserWorkers.length; i++) {
                //     
                new Thread(new PhaserWorker(phaser)).start();
            }
        }
        static class PhaserWorker implements Runnable {
            private final Phaser phaser;
            public PhaserWorker(Phaser phaser) {
                this.phaser = phaser;
            }
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " |   " );
                phaser.arriveAndAwaitAdvance(); //       
                try {
                    Thread.sleep(new Random().nextInt(5) * 1000);
                    System.out.println(Thread.currentThread().getName() + " |   " );
                    phaser.arriveAndAwaitAdvance(); //    1       
                    Thread.sleep(new Random().nextInt(5) * 1000);
                    System.out.println(Thread.currentThread().getName() + " |   " );
                    phaser.arriveAndAwaitAdvance(); //    2       
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // Phaser              
        static class MyPhaser extends Phaser{
            @Override
            protected boolean onAdvance(int phase, int registeredParties) { //             
                switch (phase) {
                    case 0:
                        System.out.println("====        ====");
                        return false;
                    case 1:
                        System.out.println("====   1    ,         ====");
                        return false;
                    case 2:
                        System.out.println("====   2    ,     ====");
                        return false;
                    default:
                        return true;
                }
            }
        }
    }
    
    

    以上のプログラムの実行結果は以下の通りです.
    Thread-0|到着
    Thread-4|到着
    Thread-3|到着
    Thread-1|到着
    Thread-2|到着
    ====集合完了発車===
    Thread-0|到着
    Thread-4|到着
    Thread-1|到着
    Thread-3|到着
    Thread-2|到着
    ====スポット1の集合が終わり、発車して次のスポットへ===
    Thread-4|到着
    Thread-3|到着
    Thread-2|到着
    Thread-1|到着
    Thread-0|到着
    ====スポット2の集合が終わり、発車して家に帰る===
    面接問題について
    1.次のクラスで、リソースのグループへのアクセス権を制御するクラスはどれですか?
    A:Phaser
    B:Semaphore
    C:CountDownLatch
    D:CyclicBarrier
    A:B
    2.次のどのクラスが再利用できませんか?
  • A:Phaser
  • B:Semaphore
  • C:CountDownLatch
  • D:CyclicBarrier

  • 答え:C
    3.次のどの方法がCountDownLatchクラスに属していませんか?
  • A:await()
  • B:countDown()
  • C:getCount()
  • D:release()

  • 答え:D.release()はSemaphoreのリリースライセンスのメソッドであり、CountDownLatchクラスにはこのメソッドは含まれていません.
    4.CyclicBarrierとCountDownLatchの違いは何ですか?
    答え:CyclicBarrierとCountDownLatchは本質的にvolatileとCASに依存して実現され、それらの違いは以下の通りである.
  • CountDownLatchは1回しか使用できませんが、CyclicBarrierは複数回使用できます.
  • CountDownLatchは手動で1つ以上のスレッドの実行完了を待ってから実行することを指定しますが、CyclicBarrierはnつのスレッドが互いに待機しています.いずれのスレッドが完了する前に、すべてのスレッドが待機する必要があります.

  • 5.次のどのクラスにawait()メソッドが含まれていませんか?
  • A:Semaphore
  • B:CountDownLatch
  • C:CyclicBarrier

  • A:A
    6.次のプログラムの実行にどれくらいの時間がかかりましたか.
    Semaphore semaphore = new Semaphore(2);
    ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    for (int i = 0; i < 3; i++) {
        semaphoreThread.execute(() -> {
            try {
                semaphore.release();
                System.out.println("Hello");
                TimeUnit.SECONDS.sleep(2);
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    
    
  • A:1 s以内
  • B:2 s以上
  • A:はい.ループはrelease()すなわちライセンスを解放する方法を先に実行するので,プログラムは一度に3つのスレッドを実行し,同時に1 s以内に実行することができる.
    7.Semaphoreでよく使われる方法は何ですか?
    答え:よく使われる方法は以下の通りです.
  • acquire():ライセンスを取得します.
  • release():ライセンスを解放します.
  • availablePermits():現在使用可能なライセンス数.
  • acquire(int n):n個のライセンスを取得して使用します.
  • release(int n):n個のライセンスを解放します.

  • 8.Phaserの一般的な方法は何ですか.
    答え:よく使われる方法は以下の通りです.
  • register():Phaser
  • に新しい参加者を登録します.
  • arriveAndAwaitAdvance():他のスレッドが
  • を実行するのを待つ
  • arriveAndDeregister():このスレッド
  • をログアウトします.
  • forceTermination():Phaserを終端状態
  • に強制する
  • isTerminated():Phaserが
  • を終了するかどうかを判断する
    9.次のプログラムは正常に実行できますか?「発車しました」は何回印刷されましたか?
    import java.util.concurrent.*;
    public class TestMain {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    System.out.println("   ");
                }
            });
            for (int i = 0; i < 4; i++) {
                new Thread(new CyclicWorker(cyclicBarrier)).start();
            }
        }
        static class CyclicWorker implements Runnable {
            private CyclicBarrier cyclicBarrier;
    
            CyclicWorker(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
            @Override
            public void run() {
                for (int i = 0; i < 2; i++) {
                    System.out.println("  :" + i);
                    try {
                        cyclicBarrier.await();
                        System.out.println("   II:" + i);
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    

    A:正常に実行できます.await()を2回実行したので、「発車しました」を4回印刷しました.
    まとめ
    ここではsynchronizedよりも高度な4つのスレッド同期クラスを紹介し、CountDownLatch、CyclicBarrier、Phaser機能はスレッド間の待機を実現するのと似ていますが、その側面は異なります.CountDownLatchは一般的に1つ以上のスレッドが実行されるのを待ってから現在のスレッドを実行するために使用されます.またCountDownLatchは繰り返し使用できません.CyclicBarrierは、スレッドリソースのセットがバリアポイントに入ってから共同で実行されるのを待つために使用される.PhaserはJDK 7が提供するより強力で柔軟なスレッド支援ツールであり、すべてのスレッドが達成されるのを待って、新しいタスクのセットを継続または開始し、Phaserはスレッドの同期個数を動的に増加および除去する機能を提供します.Semaphoreが提供する機能は、リソースのアクセス権のセットを制御するためのロックに似ています.