Java同時プログラミング(十五):CountDownLatchソースコード逐行深さ分析


前言
 CountDownLatchはカウンタ(またはstateフィールド)を維持し、countDownメソッドを呼び出すとカウンタが1減少し、awaitメソッドを呼び出すとカウンタが0になるまでスレッドがブロックされます.1つのスレッドがすべてのサブスレッドタスクの完了を待ってから実行を継続する論理を実現するために使用することもできるし、複数のスレッドを待機させながら論理の実行を開始する目的を達成するために、簡易CyclicBarrierのような機能を実現することもできる.前のSemaphoreソース分析とReentrantLockソース分析の基礎があり、CountDownLatchのソースコードを見ると簡単です.
使用
  • あるスレッドは、他のスレッドの実行が完了するのを待ってから
  • を実行する.
    	......
    	CountDownLatch cdl = new CountDownLatch(10);
    	ExecutorService es = Executors.newFixedThreadPool(10);
    	for (int i = 0; i < 10; i++) {
         
    		es.execute(() -> {
         
    			//do something
    			cdl.countDown();
    		});
    	}
    	cdl.await();
    	......
    
  • CyclicBarrierのような機能を実現し、awaitを先に、countDown
  • 	......
            CountDownLatch cdl = new CountDownLatch(1);
            ExecutorService es = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
         
                es.execute(() -> {
         
                    cdl.await();
                    //do something
                });
            }
            Thread.sleep(10000L);
            cdl.countDown();
            ......
    

    ソース分析
     CountDownLatchの構造はReentrantLock,Semaphoreの構造と類似しており,内部クラスSyncがAQSを継承する方式でもあり,tryAcquireSharedとtryReleaseSharedメソッドを書き換えた.まず、コンストラクション関数を見てみましょう.
    public CountDownLatch(int count) {
         
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

      0より大きいcountを入力する必要があります.CountDownLatchカウンタの初期値を表し、Syncのコンストラクション関数によって最終的に親AQSのstateフィールドに値を割り当てます.このstateフィールドは、ReentrantLockでは0と1を使用してロックの状態を識別し、Semaphoreでは信号量を識別し、ここではカウンタを表すために使用されることがわかります. CountDownLatchはawaitメソッドでブロックを完了するには、まずこのメソッドがどのように実現されているかを見てみましょう.
    public void await() throws InterruptedException {
         
            sync.acquireSharedInterruptibly(1);
        }
    

      はsyncのacquireSharedInterruptiblyメソッドを呼び出します.このメソッドはAQSで定義され、Semaphoreも呼び出すこのメソッドです.
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
         
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

      このメソッドの論理は先にSemaPhoreを解析する際に詳しく述べたが,ここではこれ以上述べないが,主に2つのメソッドの呼び出しであり,tryAcquireSharedメソッドで「ライセンス」を取得しようと試みたが,戻り値は今回の取得後の残量を表し,0以上であれば取得に成功し,そうでなければ失敗を示す.失敗すると、doAcquireSharedInterruptiblyメソッドがエンキューブロックを実行する論理に入ります.ここでは主にCountDownLatchでtryAcquireSharedメソッドの実装を見てみましょう.
    protected int tryAcquireShared(int acquires) {
         
                return (getState() == 0) ? 1 : -1;
            }
    

      とSemaphoreの実装では、stateからrequiresを減算するたびに異なり、ここではstateが0であるかどうかを直接判断し、0であれば1を返し、「ライセンス」の取得に成功したことを示す.0でない場合、失敗を表す場合は、エンキューブロックが必要です.このtryAcquireSharedメソッドからCountDownLatchの論理がわかります.stateが0になるまで、すべてのスレッドが実行許可を取得できます.  doAcquireSharedInterruptibly法について,Semaphoreの文章で詳細に解析したが,Semaphoreソースコード解析を見ることができる.
    では、countDownメソッドに戻ります.
    public void countDown() {
         
            sync.releaseShared(1);
        }
    

      はsyncのreleaseSharedメソッドを呼び出します.このメソッドは親AQSに定義され、Semaphoreもこのメソッドを使用します.
    public final boolean releaseShared(int arg) {
         
            if (tryReleaseShared(arg)) {
         
            	// state  
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    前述のCountDownLatchもtryReleaseSharedメソッドを書き換えています.
    protected boolean tryReleaseShared(int releases) {
         
                // Decrement count; signal when transition to zero
                for (;;) {
         
                    int c = getState();
                    if (c == 0)
                    	//  state  0     false
                    	//        ,           true
                    	//     countDown     state    
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                    	//    true,  state  0   0
                    	//            
                        return nextc == 0;
                }
            }
    

      Semaphoreは、信号量を解放するときは取得したライセンスをstateに返却するが、CountDownLatchはライセンスを取得していないロジック(ライセンスを取得するときはstateが0に等しいか否かを判断する)であるため、countDownのときに解放していないロジックは、stateを1減らしてstateを1に減らした後の値が0になったか否かによってreleaseが成功したか否かを判断し、stateが本来0より大きく、1を減らして0になるとtrueを返します.tryReleaseSharedメソッドの戻り値は、doReleaseSharedメソッドを呼び出してブロッキングスレッドを起動する必要があるかどうかを決定します.
    ここにはstateが0の場合falseを返す論理があります.これは主に2つの状況に対応しています.
  • countDownを呼び出す回数はstateの初期値
  • を超える.
  • マルチスレッド同時呼び出しの場合、ブロックスレッドの起動動作
  • を完了するには、1つのスレッドしかないことを保証する.
      CountDownLatchにはロックがないという概念が見られ、countDownメソッドはスレッドによって繰り返し呼び出され、stateに対してreduce操作を行うだけで、誰がreduceをしたのか気にしない.tryReleaseSharedがtrueを返す場合は、Semaphoreで呼び出されたメソッドと同じ方法で、主にブロックスレッドを起動するかPROPAGAGE状態を設定する必要があることを示します.ここでは、~ ブロックスレッドが起動するとdoAcquirereShadInterruptiblyメソッドでループが継続します.Semaphore呼び出しと同じ方法ですが、ここには違うところがあるので一言.まずdoAcquireSharedInterruptiblyメソッドに戻ります.
     private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
         
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
         
                for (;;) {
         
                    final Node p = node.predecessor();
                    if (p == head) {
         
                    	//  head.next unpark  ,    state==0
                    	//  tryAcquireShared   1
                        int r = tryAcquireShared(arg);
                        //r==1
                        if (r >= 0) {
         
                        	//node      ,      node.next
                        	//      ,      r   1
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
         
                if (failed)
                    cancelAcquire(node);
            }
        }
    

     head.nextスレッドがunparkによって起動されると、tryAcquireSharedメソッドの判断に入ります.この時点でstateは0(stateが0になった場合にのみunparkがスレッドを起動します)になっているため、CountDownLatchが書き換えたtryAcquireSharedでは、state=0の場合、1が返されるため、setHeadAndPropagateメソッドに入ります.
    private void setHeadAndPropagate(Node node, int propagate) {
         
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
         
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    この方法はSemaphoreで詳しく紹介されていますが、ここではCountDownLatchの観点から見てみましょう.実はとても簡単で、この時この方法のpropagateパラメータ値が1であることに注意して、それでは次のif論理の中に入って、引き続き次のnodeを呼び覚まします.次のnodeに対応するスレッドが起動すると、setHeadAndPropagateメソッドに進み、propagageは同じ1になります.次のnodeを起動し続け、CLHキュー全体のノードを順次起動します.
    まとめ
    単独でCountDownLatchを出してみると実は複雑ですが、CountDownLatch(SemaphoreとReentrantLockを含む)はAQSが提供するいくつかの方法を高度に共用しています.これらの方法は、先にSemaphoreとReentrantLockを紹介したときに詳細に分析されていたので、本稿でCountDownLatchを分析するとき、内部クラスSync書き換えの2つの方法:tryAcquireSharedとtryReleaseShared、すなわち「ライセンスの取得」と「ライセンスの解放」の論理に注目する必要があります.
     CountDownLatch awaitの論理では、現在のstateの値が0より大きい場合、CLHキューに入ってunparkの起動を待つ(または起動を中断する).countDownの論理では、簡単にstate-1を作成し、1つのスレッドがstateを1から0に減らすと、そのスレッドはhead.nextノードを起動し、head.nextノードが起動した後、setHeadAndPropagateメソッドでnext.nextノードを起動し、すべてのCLHキュー内のブロックノードを順次起動します.もちろん、スレッドが中断して起動されると、cancelAcquireで無効なノードの削除ロジックにも入ります.