CountDownLaunchシミュレーションを使用した同時実行

5552 ワード


CountDownLatchは、スレッドの数の初期値を持つカウンタによって実現される.スレッドが自分のタスクを完了するたびに、カウンタの値は1減少します.カウンタ値が0に達すると、すべてのスレッドがタスクを完了したことを示し、ロック(CountDowmLaunch.await()を呼び出すスレッド)で待機しているスレッドが実行タスクを再開できます.
public class Test3 {

    public static void main(String[] args) throws InterruptedException {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final Integer[] ii = {0};
        final CountDownLatch latch1 = new CountDownLatch(1000);
        final CountDownLatch latch2 = new CountDownLatch(1000);

        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //  
                        latch1.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    for (int j = 0; j < 1000; j++) {
                        // ii[0] = ii[0] + 1;
                        atomicInteger.incrementAndGet();
                    }
                    latch2.countDown();
                }
            }).start();
            //  countDownLaunch 1, 1000 ,countdownLaunch 0, 
            latch1.countDown();
        }

        //  
        latch2.await();
        // System.out.println("finish , ii[0] = " + ii[0]);                 //finish , atomicInteger = 612613
        System.out.println("finish , atomicInteger = " + atomicInteger);    //finish , atomicInteger = 1000000
    }
}

CountDownLaunch内部メンテナンス
private final Sync sync;

SyncはAbstractQueuedSynchronizerを実装する内部クラスで、内部はロックメカニズムと類似しており、0はロックなし、0より大きい回数はロック回数を表すstateフィールドがある.
public void countDown() {
   sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
     }
   return false;
}
 protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
       for (;;) {
          int c = getState();    //  state  
          if (c == 0)
              return false;   // state 0 
          int nextc = c-1;
          if (compareAndSetState(c, nextc))    // cas state , true , 
             return nextc == 0;
      }
}
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)    //  state=0 1 false, 
            doAcquireSharedInterruptibly(arg);
    }
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //  。
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;    //  , false, 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }