CountDownLatchを書き換えてスレッド状態監視を実現


要件:管理スレッドは、作業スレッドがすべて待機状態にあることをタイムリーに把握し、待機条件を満たして実行を再開する必要があります.
 
アイデア:1つの方法は「心拍法」で、作業スレッドを管理スレッドにタイミングよく「到着」させることです.ここでは,同時状態カウントにより状態監視を実現したい.jdkはjavaという2つの同時状態カウントを提供する.util.concurrent.CountDownLatchとjava.util.concurrent.CyclicBarrier .CountDownLatchが初期化されると、ワークスレッドはcountDownメソッドを呼び出し、カウントが0に減少するまでawaitメソッドを呼び出す他のスレッドはブロックされます.1つまたは複数の他のスレッドが1つのスレッドのセットが一定のポイントを通過するのを待つ場合に適用されますが、1つの限界があります.すなわち、1回でリセットできます.CyclicBarrierはリセットできますが、スレッドのセットがそれぞれあるポイントに達した後、互いに待機して歩調が一致する場合にのみ適用されます.(JDK 1.6参照)
そこで私はCountDownLatchクラスを修正し、連続的な監視のニーズを満たすためにリセットをサポートすることを考えています.
 
コードは次のとおりです.
変更後の新しいカウンタクラス:ResetableCountDownLatch
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

public class ResetableCountDownLatch {
    /**
     * Synchronization control For ResetableCountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setCount(count);
        }
        
        void setCount(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code ResetableCountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResetableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    //  
    public void setCount(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        sync.setCount(count);
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

 
テスト例:
 
    public static void main(String[] args)
    {
        //  , 3 
        ResetableCountDownLatch latch = new ResetableCountDownLatch(3);
        ThreadGroup tg = new ThreadGroup("");

        class RunBody implements Runnable {
            ResetableCountDownLatch latch;
            
            RunBody(ResetableCountDownLatch latch) {
                this.latch = latch;
            }

            @Override
            public void run()
            {
                System.out.println(Thread.currentThread().getName() + " start.");
                
                for(int i=0; i<Thread.currentThread().getId(); ++i) {
                    try
                    {
                        Thread.sleep(1000);

                        //  wait 
                        synchronized(this) {
                            System.out.println(Thread.currentThread().getName() + " wait " + (i+1) + "time(s)");
                            latch.countDown();
                            this.wait();
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    
                    System.out.println(Thread.currentThread().getName() + " continue.");
                }

                //  countDown “ ” 
                System.out.println(Thread.currentThread().getName() + " finish.");
                latch.countDown();
            }
            
        }
        
        RunBody threadBody = new RunBody(latch);
        
        for(int i=0; i<3; ++i) {
            new Thread(tg, threadBody).start();
        }

        while(true) {
            try
            {
                latch.await();
                //  
                if(0==tg.activeCount()) {
                    break;
                }

                System.out.println("Main: there are " + tg.activeCount() + " live threads all waiting");
                synchronized(threadBody) {
                    //  , : 
                    latch.setCount(tg.activeCount());
                    System.out.println("Main: wake them up.");
                    threadBody.notifyAll();
                }
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        System.out.println("Main: All threads finished.");
    }

 
テスト出力:
Thread-1 start.
Thread-2 start.
Thread-0 start.
Thread-2 wait 1time(s)
Thread-0 wait 1time(s)
Thread-1 wait 1time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-2 wait 2time(s)
Thread-0 wait 2time(s)
Thread-1 wait 2time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-0 continue.
Thread-2 continue.
Thread-2 wait 3time(s)
Thread-1 wait 3time(s)
Thread-0 wait 3time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-2 wait 4time(s)
Thread-0 wait 4time(s)
Thread-1 wait 4time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-1 wait 5time(s)
Thread-2 wait 5time(s)
Thread-0 wait 5time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-2 continue.
Thread-1 continue.
Thread-0 wait 6time(s)
Thread-1 wait 6time(s)
Thread-2 wait 6time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-0 continue.
Thread-2 wait 7time(s)
Thread-1 wait 7time(s)
Thread-0 wait 7time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-0 wait 8time(s)
Thread-1 wait 8time(s)
Thread-2 wait 8time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-0 finish.
Thread-2 wait 9time(s)
Thread-1 wait 9time(s)
Main: there are 2 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-1 finish.
Thread-2 wait 10time(s)
Main: there are 1 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-2 finish.
Main: All threads finished.