高度なJAVA-ハイパラレルインタフェースストリーム制限Semaphore

10438 ワード

Semaphoreの紹介
Semaphoreは、JDK 1.5のjava.util.concurrentと発注書で提供される同時ツールクラスです.
Semaphoreは文字通り信号量ですが、個人的には分かりやすい言い方はライセンスマネージャだと思います
公式の解釈は
  • Semaphoreはカウント信号量
  • である
  • 概念的には、Semaphoreはライセンス
  • のセットを含む
  • 必要に応じてacquire()メソッドを呼び出すたびに、利用可能なライセンス
  • が取得されるまでブロックされます.
  • release()メソッドを呼び出すたびに、ライセンスを持つスレッドが解放され、使用可能なライセンス
  • がSemaphoreに返却される.
  • 実際にはスレッドで使用するための実際のライセンスオブジェクトはありません.Semaphoreは、使用可能な数の管理メンテナンス
  • にすぎません.
    Semaphore内部は主にAQS(AbstractQueuedSynchronizer)によってスレッドの管理を実現する.
    スレッドは実行時にまずライセンスを取得し、成功すればライセンス数が1減少し、スレッドが実行されます.スレッドの実行が完了するとライセンスが解放され、ライセンス数は1に加算されます.ライセンス数が0の場合、取得に失敗し、スレッドはAQSの待機キューにあり、他のライセンスを解放したスレッドによって起動します.
    Semaphoreのコンストラクタ
    Semaphore(int permits) :            Semaphore
    Semaphore(int permits, boolean fair) :            Semaphore ,          ( FIFO      )

    Semaphoreのよく使われるいくつかの方法
    boolean isFail() :   Semaphore       
    int availablePermits() :   Semaphore        
    boolean hasQueuedThreads() :     Semaphore               
    int getQueueLength() :     Semaphore             
    
    void acquire() throws InterruptedException :              ,          ,         ,           InterruptedException ,          
    void acquire(int permits) throws InterruptedException :                ,            ,         ,
              InterruptedException ,          
    boolean tryAcquire() :     1       ,      ,              ,            true .                ,    false .        tryAcquire(int permits)
    boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException :           1       ,             true ,             false,                 ,       tryAcquire(int permits, long timeout, TimeUnit unit)
    void acquireUninterruptibly() :          1       ,        acquireUninterruptibly(int permits)
    int drainPermits() :             
    
    void release() :   1        ,     acquire(int permits)

    Semaphoreのコードプレゼンテーション
    package com.xing.apiLimiting;
    
    import lombok.AllArgsConstructor;
    import org.apache.commons.lang3.RandomUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @title    Semaphore
     * @author Xingbz
     * @createDate 2018-12-7
     */
    public class SemaphoreDemo {
        public static void main(String[] args) throws Exception {
    //        demo1();//Semaphore     /      /       
    //        demo2();//        /  /  
    //        demo3();//    
        }
    
        /**
         * @title Semaphore     /      /       
         * @author Xingbz
         */
        private static void demo1() throws Exception {
            Semaphore semaphore1 = new Semaphore(5);//           Semaphore
            Semaphore semaphore2 = new Semaphore(5, true);//           Semaphore ,         (FIFO)
            System.out.println("       : semaphore1 " + semaphore1.isFair() + "; semaphore2 " + semaphore2.isFair());//      FIFO
    
            semaphore2.acquire();//            
            System.out.println("    1       ;           : " + semaphore2.availablePermits());
    
            semaphore2.release();//  1    
            System.out.println("    1     ;           : " + semaphore2.availablePermits());
    
            semaphore2.acquire(2);//    N       
            System.out.println("    2       ;           : " + semaphore2.availablePermits());
    
            semaphore2.release(2);//  N     //       3 (>2) ,           6 ?
            System.out.println("    2     ;           : " + semaphore2.availablePermits() + "
    "); } /** * @title / / * @author Xingbz */ private static void demo2() throws Exception { Semaphore semaphore = new Semaphore(5, true); new Thread(() -> {// System.out.println(Thread.currentThread().getName() + " . . ."); int permits = semaphore.drainPermits();// System.out.println(Thread.currentThread().getName() + " " + permits + " ; : " + semaphore.availablePermits()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(permits); System.out.println(Thread.currentThread().getName() + " " + permits + " ; : " + semaphore.availablePermits()); }).start(); Thread.sleep(100); System.out.println("
    [1] : " + semaphore.availablePermits()); System.out.println("[1] : " + semaphore.hasQueuedThreads()); System.out.println("[1] : " + semaphore.getQueueLength()); new Thread(() -> {// 1 try { System.out.println(Thread.currentThread().getName() + " 1 . . ."); semaphore.acquire();// , , 1 System.out.println(Thread.currentThread().getName() + " 1 ; : " + semaphore.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); System.out.println(Thread.currentThread().getName() + " 1 ; : " + semaphore.availablePermits()); }).start(); Thread.sleep(100); System.out.println("
    [2] : " + semaphore.availablePermits()); System.out.println("[2] : " + semaphore.hasQueuedThreads()); System.out.println("[2] : " + semaphore.getQueueLength()); new Thread(() -> {// 2 try { System.out.println(Thread.currentThread().getName() + " 2 . . ."); semaphore.acquire(2); System.out.println(Thread.currentThread().getName() + " 2 ; : " + semaphore.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(2); System.out.println(Thread.currentThread().getName() + " 2 ; : " + semaphore.availablePermits()); }).start(); Thread.sleep(100); System.out.println("
    [3] : " + semaphore.availablePermits()); System.out.println("[3] : " + semaphore.hasQueuedThreads()); System.out.println("[3] : " + semaphore.getQueueLength()); Thread.sleep(6000); System.out.println("
    [ ] : " + semaphore.availablePermits()); System.out.println("[ ] : " + semaphore.hasQueuedThreads()); System.out.println("[ ] : " + semaphore.getQueueLength()); } private static final ExecutorService executorService = Executors.newCachedThreadPool(); /** * @title * @description * : * 1 . * 2 . 2 * permits=2 Semaphore * 3 . 20 * Semaphore (FIFO) , 20 * 4 . , * 5 . * acquireUninterruptibly() , * 6 . , , , * tryAcquire(timeout,TimeUnit) , * 7 . , , , * acquire() , * @author Xingbz */ private static void demo3() throws Exception { Semaphore semaphore = new Semaphore(2, true); List futureList = new ArrayList<>(5);// , for (int i = 1; i <= 20; i++) { if (i <= 10) {// 10 executorService.execute(new Car(" " + i, semaphore, 1)); } else if (i <= 15) {// 5 , executorService.execute(new Car(" " + i, semaphore, 2)); } else {// 5 Future future = executorService.submit(new Thread(new Car(" " + i, semaphore, 3))); futureList.add(future); } } Thread.sleep(10000);// 10S , System.out.println(" , "); futureList.forEach(f -> f.cancel(true)); } @AllArgsConstructor static class Car implements Runnable { /** */ private String carNo; /** */ private Semaphore semaphore; /** : 1 ; 2 ; 3 */ private int type; /** * @title * @author Xingbz */ @Override public void run() { switch (type) { case 1: // , semaphore.acquireUninterruptibly(); try { Thread.sleep(RandomUtils.nextLong(3000, 4000));// . . . } catch (InterruptedException e) { } semaphore.release();// , System.out.println(carNo + " . "); break; case 2:// , 1s , try { if (semaphore.tryAcquire(RandomUtils.nextInt(6000, 11000), TimeUnit.MILLISECONDS)) { Thread.sleep(RandomUtils.nextLong(3000, 4000));// . . . semaphore.release();// , System.out.println(carNo + " . "); } else {// System.out.println(carNo + " . "); } } catch (InterruptedException e) { } break; case 3:// try { semaphore.acquire(); Thread.sleep(RandomUtils.nextLong(3000, 4000));// . . . semaphore.release(); System.out.println(carNo + " . "); } catch (InterruptedException e) {// , System.out.println(carNo + " , . "); } } } } }