JUCコンカレントキットのCountDownLatch

25306 ワード

1、紹介
この文書では、CountDownLatchについて説明し、CountDownLatchを使用することで、1つのスレッドを他の1つ以上のスレッドが実行されるまでブロックすることができます.
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
2、同時プログラミングにおける使い方
簡単に言えば、CountDownLatchにはcounter属性があり、私たちのニーズに応じて減少することができ、counterが0に減少するまで呼び出されているスレッドをブロックすることができます.
並列処理を行う場合は、CountDownLatchのcounter値を実行するスレッドの数に初期化できます.各スレッドの実行が終了したときにcountdownメソッドを呼び出すことができます.この場合、現在のメインスレッド呼び出しawaitメソッドは、他のworkerスレッドが完了するまでブロックされます.
3、複数スレッドの処理完了待ち
Workerスレッドを作成し、CountDownLatchプロパティを設定することで、スレッドの実行が完了したときに通知できます.
public class Worker implements Runnable { private List> outputScraper; private CountDownLatch countDownLatch; public Worker(List> outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } } 

次に、メインスレッドがワークスレッドの完了にブロックされることを検証するテスト例を作成します.
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); List> workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); } 

他の作業スレッドに依存してCountDownLatchの解放が完了するため、「Latch released」は常に最後に出力されることが明らかになった.
注意awaitメソッドを呼び出さないと、スレッドの実行順序を保証できないため、最後のテスト結果はランダムに失敗します.
4、複数のスレッドが運転開始を待つ
次に、上記の例を示しますが、今回は5つではなく千個以上のスレッドを実行しています.このうち、前に実行されたスレッドは、後ろに実行されていないうちに終了している可能性が高いので、すべてのスレッドを並列に実行する方法がないため、同時問題を再現することは難しいです.
この問題を解決するために、今回CountDownLatchを使用する方法は、上記の例とは異なり、メインスレッドをブロックしてサブスレッドの実行が終了するのを待つよりも、すべてのサブスレッドが起動して完了するまですべてのサブスレッドをブロックすることができます.
前例のrunメソッドを変更して、本番前にブロックし続けます.
public class WaitingWorker implements Runnable { private List> outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List> outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } } 

次に、テスト例を変更して、すべてのサブスレッドが正式に実行されるまでブロックし、ブロックを開いてすべてのサブスレッドを実行し、最後にメインスレッドがすべてのサブスレッドが実行されるまでブロックします.
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List> workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); } 

このモードは、数千のスレッドが論理を並列に実行できるため、同時問題の再現に非常に役立ちます.
5、CountDownLatchの早期終了
countdown()メソッドの実行前にスレッドが終了する場合があります.これにより、CountDownLatchは0未満に減少し、最終的にメインスレッドがブロックされます.
@Override
public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); } 

await()メソッドが常にブロックされていることを説明する前のテスト例を変更します.
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List> outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); List> workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); } 

明らかに、これは私たちが望んでいる結果ではなく、プログラムは永遠にブロックされているのではなく、前に実行しなければなりません.
この問題を解決するためにawait()メソッドを呼び出すときにtimeoutのパラメータを追加します.
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse(); 

これにより、テスト・インスタンスは最終的にタイムアウトし、await()はfalseを返します.
6、まとめ
  • CountDownLatchを使用して、他のスレッドが実行されるまでメインスレッドをブロックする方法を示しました.
  • また、 の問題をデバッグするために複数のスレッド を実行できることを示した.

  • 最後にこれらの例の実装はGithubで見つけることができます
    原文住所:https://www.baeldung.com/java-countdown-latch