同時12:CountDownLatch、CyclicBarrier、Semaphore実装分析

11010 ワード

J.U.Cには、3つの同期ツールCountDownLatch、CyclicBarrier、Semaphoreが用意されており、スレッドのタスク調整に使用される共有ロックの特殊なアプリケーションです.

CountDownLatch


小さな栗:
public class CountDownLatchTest {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": ");
                    Thread.sleep(10000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": ");
                    Thread.sleep(10000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println(" ");
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + ": ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();
    }
}

出力:「Thread-0:二次表生成、Thread-1:二次表生成、二次表生成完了待ち」を行い、Thread-0、Thread-1の実行が完了するまで待ち、「Thread-2:集計統計」を開始します.
CountDownLatchは、スレッドが他のNスレッドが先に実行されてから実行を開始するのを待つことができるカウントダウン式のカウンタです.
CountDownLatchはAQSに基づいて、共有ロックであり、await()は現在のスレッドをブロックして待機させ、countDown()カウンタは減少する.
// CountDownLatch aqs :
‘’private static final class Sync 
                extends AbstractQueuedSynchronizer {
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
    // 
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    // 
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

countはカウントダウンの初期数です.
await()はtryAcquireShared(1)メソッドを呼び出してロックを取得し、共有ロックの実装の戻り値が0未満の場合、スレッドがブロックされて待機します.つまりstate=0の場合にのみロックが正常に取得されます.
countDown()はtryReleaseShared(1)メソッドを呼び出してロックを解除し、state値が0の場合、共有ロックは完全に解放され、キュー内で待機しているスレッドが呼び出されます.
CountDownLatchはリセット操作がなく、stateの値が0のときにawait()を呼び出すとスレッドがブロックされないので、CountDownLatchは一度しか使用できません.

CyclicBarrier


小さな栗:
public class CyclicBarrierTest {
    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(3);
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": ");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ": ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": ");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ": ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();

        try {
            Thread.sleep(10000);//  
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ": ");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ": ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();
    }
}

出力結果:「Thread-0:計算完了、Thread-1:計算完了」、「Thread-2:計算完了」まで待ち、「Thread-0:入庫、Thread-1:入庫、Thread-2:入庫」.
CyclicBarrierはサイクル可能な同期バリアであり、ブロックスレッドの数がバリアポイントに達するまでバリアが破られ、Nスレッドが実行され続ける.
CyclicBarrierは、再ロック実装を使用して、バリアポイントparties、すなわちブロックするスレッドの数を初期化時に入力し、バリアが破られたときに実行されるRunnable実装barrierActionを入力することもできます.バリアが破られない前にawait()メソッドを呼び出すスレッドはブロックされます.
public class CyclicBarrier {
    /**   */
    private static class Generation {
        boolean broken = false;
    }
    /**   */
    private final ReentrantLock lock = new ReentrantLock();
    /** condition */
    private final Condition trip = lock.newCondition();
    /**   */
    private final int parties;
    /**   */
    private final Runnable barrierCommand;
    /**   */
    private Generation generation = new Generation();
    /**   */
    private int count;
    ... ...
}

ブロッキング・フロー:
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();// 
    try {
        // 
        final Generation g = generation;
        if (g.broken)//  s1
            throw new BrokenBarrierException();
        // 
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 
        int index = --count;
        //  s3
        if (index == 0) { 
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // barrierCommand
                if (command != null)
                    command.run();
                ranAction = true;
                // 
                // generation
                // 
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        for (;;) {// s2
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && !g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation) {
                System.out.println(" ");
            return index;
            }
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

s 1:g.broken=trueはバリアが破られたことを示し、await()を呼び出すと異常が放出される.
スレッドが中断するとバリアを破り異常を放出し、
カウンタcountはindexに減算され、index=0はs 2に入り、そうでなければs 3に入る.
s 2:index=0は、ブロックされたスレッドの数がバリアポイントに達していることを示し、barrierActionが空でない場合はrunメソッドを直接呼び出し、先に実行させる.
nextGeneration()はtrip上で待機しているすべてのスレッドを呼び出し、countを初期値partiesに再割り当て、newはgenerationにGenerationを割り当てます.これにより、CyclicBarrierは元のように回復し、再使用され、0に戻ることができます.
s 3:count>0は、まだバリアポイントに達していないことを示し、for(;)に入るループは、バリアが破られるまでスレッドを条件キューtrip上で待機させます.バリアが破られるとgenerationが再付与され、起動されたスレッドは(g!=generation)という点で正常にループを終了します.
CyclicBarrierバリアが正常に破られた後にリセット操作が行われているので、CyclicBarrierは繰り返し使用できます.

Semaphore


小さな栗:
public class SemaphoreTest {
    private static final int tokenCount = 3;
    public static void main(String[] args) {
        final Semaphore tokens = new Semaphore(tokenCount); //  
        for(int i=0;i<10;i++)
            new Request(tokens).start();
    }

    static class Request extends Thread {
        private Semaphore tokens;

        public Request(Semaphore tokens) {
            this.tokens = tokens;
        }
        @Override
        public void run() {
            try {
                tokens.acquire();//  
                System.out.println(Thread.currentThread().getName()+": ...");
                Thread.sleep(3000);
                tokens.release();//  
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

出力:「Thread-0:リソースへのアクセス、Thread-1:リソースへのアクセス、Thread-2:リソースへのアクセス」待ち、そして「Thread-3:リソースへのアクセス、Thread-5:リソースへのアクセス、Thread-4:リソースへのアクセス」は、コンピュータの順序によって異なる場合があり、いずれも3つのグループへのアクセスであることがわかります.
Semaphore信号は、アクセス制限されたリソースのスレッド数を制限し、アクセスリソースのスレッド数を一定の値に調整するために使用される.ネットワークアプリケーションでは、トラフィックのピークからサーバを保護するためにストリームを制限し、ストリームを制限するためにトークンバケツアルゴリズムを使用し、Semaphoreはトークンバケツを実現することができます:アクセスラインは先にトークンを取得してからアクセスすることができ、アクセスが終わった後にトークンをバケツに返して他のスレッドのために使用することができ、アクセスリソースのスレッド数とトークン数を保証します.
Semaphoreは共有ロックであり、内部コードレイアウトはReentrantLockと同様であり、公平性設定をサポートし、公平性ロックに設定すれば、最も待ち時間のあるスレッドが先に信号を取得することができ、デフォルトは非公平性である.
public class Semaphore {
    /**   */
    private final Sync sync;
    /**  */
    abstract static class Sync extends AbstractQueuedSynchronizer {... ...}
    /**  */
    static final class NonfairSync extends Sync {}
    /**  */
    static final class FairSync extends Sync {}
    /**  */
    public MySemaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    /**  */
    public MySemaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    ... ... 
}

ロック解除プロセスは、前の書き込みの共有ロックと一致します.異なるのは、信号取得の論理に再入力の処理がなく、1つのスレッドが複数回信号を取得することができ、取得するたびに総量が1減少することである.信号の解放時には信号の総量を制御していない.例えば、初期の信号は5個で、すでに5個を取得し、7個を解放した.この場合、利用可能な信号は7個である.つまり、解放時には信号の数を拡張することができ、使用中に信号の数を一定に保つ必要がある場合は、acquireとreleaseのペアが現れることを保証しなければならない.

小結

  • CountDownLatchとCyclicBarrierはいずれもカウンタの形式でスレッド同期を調整し、1つの顕著な違いはCyclicBarrierが再利用可能であり、CountDownLatchは使い捨てである.
  • CountDownLatchとCyclicBarrierのもう一つの意味的な違いは、Count DownLatchはスレッドが他のNスレッドの実行を待つことである.CyclicBarrierは,いずれも実行が完了するまでN個のスレッドが互いに待機している.
  • CountDownLatchは依存を強調し、CyclicBarrierは協力を強調した.典型的な適用シーンは、大きなタスクを小さなタスクに分解し、計算結果をマージすることです.たとえば、マルチスレッドが大きなファイルをダウンロードし、複数のダウンロードスレッドが自分で割り当てたファイルセグメントをダウンロードした後、マージスレッドはファイルマージ操作を開始します.
  • Semaphoreは、アクセス制限されたリソースのスレッド数を制限するために使用されます.典型的なアプリケーションシーンは、データベースの同時操作などのトラフィック制御です.データベース接続プールは10個しかありません.10個のスレッドしか接続できないことを保証しなければなりません.そうしないと、エラーが発生します.
  • Semaphoreは、例えば、RPCサーバが200 QPSしかサポートできない場合、要求RPCのスレッド数をSemaphoreで制限することができる簡単なサービスエンド制限ストリームを行うことができる.もちろん、複雑なサービス・エンド制限フローに対しては、より効率的なトークン・バケツまたはリーク・バケツアルゴリズムを使用する必要がある.

  • コードワードは容易ではありませんて、転載して原文の接続を保留してくださいhttps://www.jianshu.com/p/9e0ecc8b1358