CountDownLaunchシミュレーションを使用した同時実行
5552 ワード
CountDownLatchは、スレッドの数の初期値を持つカウンタによって実現される.スレッドが自分のタスクを完了するたびに、カウンタの値は1減少します.カウンタ値が0に達すると、すべてのスレッドがタスクを完了したことを示し、ロック(CountDowmLaunch.await()を呼び出すスレッド)で待機しているスレッドが実行タスクを再開できます.
public class Test3 {
public static void main(String[] args) throws InterruptedException {
final AtomicInteger atomicInteger = new AtomicInteger();
final Integer[] ii = {0};
final CountDownLatch latch1 = new CountDownLatch(1000);
final CountDownLatch latch2 = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//
latch1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 1000; j++) {
// ii[0] = ii[0] + 1;
atomicInteger.incrementAndGet();
}
latch2.countDown();
}
}).start();
// countDownLaunch 1, 1000 ,countdownLaunch 0,
latch1.countDown();
}
//
latch2.await();
// System.out.println("finish , ii[0] = " + ii[0]); //finish , atomicInteger = 612613
System.out.println("finish , atomicInteger = " + atomicInteger); //finish , atomicInteger = 1000000
}
}
CountDownLaunch内部メンテナンス
private final Sync sync;
SyncはAbstractQueuedSynchronizerを実装する内部クラスで、内部はロックメカニズムと類似しており、0はロックなし、0より大きい回数はロック回数を表すstateフィールドがある.
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); // state
if (c == 0)
return false; // state 0
int nextc = c-1;
if (compareAndSetState(c, nextc)) // cas state , true ,
return nextc == 0;
}
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // state=0 1 false,
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false; // , false,
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}