JAVA同時CountDownLatch
22464 ワード
前言
以前、ReentranLockをロックレベルからAQSソースレベルに至るまで、必要な同期器を構築する方法、AQSを使用するにはどのような方法が必要かを分析しました.ReentranLockのCLHキューでは各ノードが独占(EXCLUSIVE)されていますが、ノードのもう一つの待ち方であるシェア(SHARED)はどうなるのでしょうか.このブログでは、もう一つの同期器である閉鎖(CountDownLatch)で、何が違うのかを調べてみましょう.
CountdownLatchの役割と主な流れ:
CountDownLatchは、他のスレッドがそれぞれの作業を完了するのを待ってから、1つのスレッドを実行し続けることができます.カウンタを使用して実装します.カウンタの初期値はスレッドの数です.各スレッドが自分のタスクを完了すると、カウンタの値は1減少します.カウンタの値が0の場合、すべてのスレッドがタスクを完了したことを示し、CountDownLatchで待機しているスレッドがタスクの実行を再開します.
CountdownLatchの例:
ここではjdkソースコードの例を使用します.
1 class Driver { // ...
2 void main() throws InterruptedException {
3 CountDownLatch startSignal = new CountDownLatch(1);//
4 CountDownLatch doneSignal = new CountDownLatch(N);//
5 for (int i = 0; i < N; ++i) // create and start threads
6 new Thread(new Worker(startSignal, doneSignal)).start();
7 doSomethingElse(); // ( )
8 startSignal.countDown(); // -1( work await )
9 doSomethingElse();
10 doneSignal.await(); // wait for all to finish
11 }
12 }
13 class Worker implements Runnable {
14 private final CountDownLatch startSignal;
15 private final CountDownLatch doneSignal;
16 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
17 this.startSignal = startSignal;
18 this.doneSignal = doneSignal;
19 }
20 public void run() {
21 try {
22 startSignal.await();//
23 doWork();
24 doneSignal.countDown();// -1
25 } catch (InterruptedException ex) {
26 } // return;
27 }
28 void doWork() { ... }
29 }
CountdownLatchは、主スレッドおよび他のスレッドの優先順位を保証することができ、例では、主スレッドの最初のdoSomethingElseが他のスレッドより先に、実行後、startSignalを表示することができる.countDown()は、startSignalから他のスレッドを使用します.await()のブロック状態が解除されます.
ロックは、A操作がB操作の結果に依存するなど、いくつかの操作の実行順序を保証し、ロックでA操作をブロックし、B操作が完了した後に解放することができる.
CountDownLatchソース:
CountdownLatchのソースコードは多くありません(結局AQSは下層のいくつかの需要をパッケージしました)、コードを見てみましょう:
1 // Sync, AQS
2 private static final class Sync extends AbstractQueuedSynchronizer {
3 private static final long serialVersionUID = 4982264981922014374L;
4 Sync(int count) {
5 setState(count);
6 }
7 int getCount() {
8 return getState();
9 }
10 // , state 0, , , -1
11 protected int tryAcquireShared(int acquires) {
12 return (getState() == 0) ? 1 : -1;
13 }
14
15 //
16 protected boolean tryReleaseShared(int releases) {
17 // Decrement count; signal when transition to zero
18 for (;;) {
19 int c = getState();
20 if (c == 0)
21 return false;
22 int nextc = c-1;
23 if (compareAndSetState(c, nextc))
24 return nextc == 0;// (-1) state 0
25 }
26 }
27 }
28 private final Sync sync;
29
30 public CountDownLatch(int count) {
31 if (count < 0) throw new IllegalArgumentException("count < 0");
32 this.sync = new Sync(count);
33 }
34
35 // state 0 ,
36 public void await() throws InterruptedException {
37 sync.acquireSharedInterruptibly(1);
38 }
39
40 public boolean await(long timeout, TimeUnit unit)
41 throws InterruptedException {
42 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
43 }
44
45 // 1
46 public void countDown() {
47 sync.releaseShared(1);
48 }
49
50 public long getCount() {
51 return sync.getCount();
52 }
以上は基本的にCountdownLatchのすべてのソースコードです.await()メソッドからAQSに進み、上記のコードに基づいてawaitはacquireSharedInterruptibly()メソッドを直接呼び出しました.
1 public final void acquireSharedInterruptibly(int arg)
2 throws InterruptedException {
3 if (Thread.interrupted())// ,
4 throw new InterruptedException();
5 if (tryAcquireShared(arg) < 0)// state 0
6 doAcquireSharedInterruptibly(arg);
7 }
中断されずにstateが0でない場合(つまり、ロックを閉じるにはスレッドをロックする)、doAcquireSharedInterruptiblyメソッドが何をしているのかを見てみましょう.CountdownLatchの役割から、CLHキューにスレッドを入れてparkが休憩を待っているのではないかと推測し、ソースコードが私たちが推測しているようにしましょう.
1 private void doAcquireSharedInterruptibly(int arg)
2 throws InterruptedException {
3 final Node node = addWaiter(Node.SHARED);
4 boolean failed = true;
5 try {
6 for (;;) {
7 final Node p = node.predecessor();
8 if (p == head) {
9 int r = tryAcquireShared(arg);// state 0,r 0( )
10 if (r >= 0) {
11 setHeadAndPropagate(node, r);
12 p.next = null; // help GC
13 failed = false;
14 return;
15 }
16 }
17 if (shouldParkAfterFailedAcquire(p, node) &&
18 parkAndCheckInterrupt())//
19 throw new InterruptedException();
20 }
21 } finally {
22 if (failed)
23 cancelAcquire(node);
24 }
25 }
この方法は見覚えがあるのではないでしょうか.Reentranlockで使用されているdoAcquireInterruptiblyメソッドと同様に、スピンを続け、リソース(tryAcquireShared)を取得しようとします.成功すれば戻り、失敗すれば休憩が必要かどうかを判断します.具体的な詳細は議論を展開しないで、第1編のAQSの概要の中で私達は討論して、AQSの底層はただ1種のtryAcquire/tryReleaseを実現することを要求して、共有(shared)が応答中断(Interruptibly)を4対に組み合わせる必要があるかどうかによって、そのため1つのAQSの同期器を完全に見終わって、更に他の同期器を見る時、基本的に同じ流れです.
countDownメソッドが何をしたか見てみましょう.
1 public void countDown() {
2 sync.releaseShared(1);
3 }
4 public final boolean releaseShared(int arg) {
5 if (tryReleaseShared(arg)) {// state 0
6 doReleaseShared();
7 return true;
8 }
9 return false;
10 }
11 private void doReleaseShared() {// CLH signal unpark
12 for (;;) {
13 Node h = head;
14 if (h != null && h != tail) {
15 int ws = h.waitStatus;
16 if (ws == Node.SIGNAL) {
17 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
18 continue; // loop to recheck cases
19 unparkSuccessor(h);
20 }
21 else if (ws == 0 &&
22 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
23 continue; // loop on failed CAS
24 }
25 if (h == head) // loop if head changed
26 break;
27 }
28 }
countDownメソッドはstateを1つ減らしてみます.stateが0の場合、doReleaseSharedメソッドに入り、CLHキューで待機しているすべてのノードをunparkします(共有ノードであるため).
まとめ:
本編CountDownLatchは簡単にそのソースコードを分析して、いくつかの主要な方法に対して、私はすべてコードの上で簡単な注釈を加えて、AQSに対して一定の理解のある人はきっとその中の流れと操作を理解することができます.
転載先:https://www.cnblogs.com/zzzdp/p/9330353.html