マルチスレッド7 AQS

12576 ワード

一.概要AQS
AQSの概要
  • 同期コンポーネントの実装では、AQSはコア部分であり、同期コンポーネントの実装者は、AQSが提供するテンプレート法を用いる同期コンポーネントの意味
  • を実装する.
  • AQSは、同期状態の管理やブロックスレッドのキュー化、通知待ちなど一連の下位層の実装処理
  • を実現する.
  • AQSコア:Nodeを使用して同期キューを実現し、下位層は双方向チェーンテーブルであり、同期ロックまたは他の同期組立の基礎フレームワーク
  • に使用することができる.
    AbstractQueuedSynchronized、クラス名の先頭はAbstractですが、彼は抽象クラスではありません.意味は、単独で使用するのは意味がありません.彼に依存して同期コンポーネントを実現してこそ意味があります.テンプレートレスモードに相当します.
  • サブクラスは、そのステータスacquireおよびrelease
  • を管理する方法を継承し、実装する.
  • は同時に排他ロックと共有ロックを実現することができ、使用者の立場から見ると、それは私たちに2つのことを完成させることができ、独占制御と共有制御を完成させることができ、そのすべてのサブクラスの中でその独占機能を書き換えたAPIを実現するか、共有機能のAPIを使用するか、2つのAPIを同時に使用することはなく、彼の最も有名な実装クラスReentrantLockであっても、2つの内部クラスを通じて、それぞれ2セットのAPI
  • AQS実装の大まかな考え方は、内部に双方向のチェーンテーブルがあり、チェーンテーブルの各ノードはノードの構造であり、スレッドはロックを取得しようと試みるが、失敗した場合、現在のスレッドをノードにパッケージし、同期キューに参加し、前のノードがロックを解放した後、自分の後継ノードを呼び覚まし、先進的な先発(FIFO)に依存する待機キューのブロックロックや相関同期器のような設計目標は,単一原子int値に依存して状態を表すほとんどの同期器の有用な基礎となることである.
    以下のクラスはAQSに依存して実現されています.
  • ReentrantLock
  • ReentrantReadWritrLock.ReadLock
  • ReentrantReadWriteLock.WriteLock

  • 二.AQSを使って自分のロックを実現する
    自分のロックは必ずロックインタフェースを実現して、中の方法を書き直して、どのように書き直しますか?AQSを使用して、AQSを内部のヘルプクラスとして、中のtryAcquirとtryReleaseの方法を書き換えます.lock()はヘルプacquireの方法で実現します.この方法は少なくとも1回tryAcquireを呼び出します.同じように、ロックを解放します.私たちはヘルプのtryReleaseを書き直します.
  • ロックを取りに来たスレッドが前のロックを手に入れたスレッドであることをどうやって知ったの?
  • if(現在のスレッド==ロックを持つスレッド){state++;}要求カウンタ自己増
  • ロックを解除する方法
  • はn回繰り返してロックを受け取り、カウンタに
  • を順次減らすように要求した.
    
    public class AQSDemo01 implements Lock {
    private Helper helper = new Helper();
    
    private class Helper extends AbstractQueuedSynchronizer {
    
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            Thread t = Thread.currentThread();
    
            if (state == 0) {
                //              ,             arg  ...... (      , compareAndSerState  ,               0        )
                if (compareAndSetState(0, arg)) {
                    System.out.println("    "+Thread.currentThread().getName()+"   arg=="+arg);
                  setExclusiveOwnerThread(t);
                    return true;
               }
            } else if (getExclusiveOwnerThread() == t) {
                setState(state + 1);
                return true;
            }
            return false;
        }
    
        @Override
        protected boolean tryRelease(int arg) {
    
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new RuntimeException();
            }
            int state = getState() - arg;
    
            boolean flag = false;
    
            if (state == 0) {
                setExclusiveOwnerThread(null);
                flag = true;
    
            }
            setState(state);
            return flag;
        }
    
    
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    
    @Override
    public void lock() {
        helper.acquire(1);
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        helper.acquireInterruptibly(1);
    }
    
    @Override
    public boolean tryLock() {
        return helper.tryAcquire(1);
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return helper.tryAcquireNanos(1, unit.toNanos(time));
    }
    
    @Override
    public void unlock() {
        helper.release(1);
    }
    
    @Override
    public Condition newCondition() {
        return helper.newCondition();
    }
    }

    テストクラスは次のとおりです.ロックは正常です.
    public class textAQS {
    private int value=0;
    private AQSDemo01 lock = new AQSDemo01();
    public int next() {
       lock.lock();
    
        try {
            Thread.sleep(300);
            return value++;
        } catch (InterruptedException e) {
            e.printStackTrace();
           throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
    
    /*
     *              ,       , a()                    b(),        
     * */
    public void a() {
        lock.lock();
        System.out.println("a");
        b();
        lock.unlock();
    }
    
    public void b() {
        lock.lock();
        System.out.println("b");
        lock.unlock();
    }
    
    public static void main(String[] args) {
    
        textAQS m = new textAQS();
        //     
       new Thread(new Runnable() {
    
            @Override
            public void run() {
              m.a();
            }
        }).start();
    
        System.out.println("   =="+Thread.currentThread().getName());
    
        ExecutorService executorService = Executors.newCachedThreadPool();
    
    
        //            next  ,        ,  
         for (int i=0;i<4;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while(true)
                        System.out.println(Thread.currentThread().getName()+"  "+m.next());
                }
            });
    
        }
    
    }
    
    }

    AQSの同期コンポーネント
    1. CountDownLatch
  • は、与えられたカウントでCountDownLatchを初期化する.countDown()メソッドが呼び出されるため、現在のカウントがゼロに達するまでawaitメソッドはブロックされます.一定回数呼び出されたCountDown()がカウンタの値がゼロの場合、待機中のスレッドはすべて解放され、awaitの後続の呼び出しはすべてすぐに返されます.この現象は一度しか現れない--カウントは
  • にリセットできない.
  • その典型的な使用シーンは分布計算
  • である.
  • 例えば
    課代表はすべての学生が宿題を出してから先生に渡します.1:課代表はすべての学生(スレッド)が宿題を出すのを待っています.CountDownLatch cdl=new CountDownLatch(int学生数)2:一人の学生が仕事をして、cdl.countDown()-->学生数を3:メインスレッド:cdl.await()学生数がゼロでない限り、待っています.
  • public class countDownLatch02 {
    private static int []nums;
    
    public countDownLatch02(int line){
        nums=new int[line];
    }
    
    //       ,        (     )   
    public  void colculate(String s ,int index,CountDownLatch count){
        System.out.println("        ..  "+Thread.currentThread().getName());
        String[] s1 = s.split(",");
        int total=0;
        for (String s2:s1) {
            int i = Integer.parseInt(s2);
            total+=i;
        }
        nums[index]=total;
    
        System.out.println(Thread.currentThread().getName() +"          =="+total);
        count.countDown();
    }
    
    
    //         
    public   void sum(){
        System.out.println("        ...");
        int total =0;
        for(int i=0;i contents = readFile();
        int size= contents.size();
        CountDownLatch c = new CountDownLatch(size);
    
        countDownLatch02 latch = new countDownLatch02(size);
    
        System.out.println("zhuxianc");
        // //          ,
        for(int i=0;i2){  //  ,         ..
            System.out.println("        "+Thread.activeCount());
        }*/
        c.await();
        latch.sum();
    }
    
    /*
     *     ,       list  ...
     * */
    public static List readFile() throws IOException {
        List list = new ArrayList<>();
        String line=null;
        BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\SETextMaven\\textcountDownLatch.txt"));
        while ((line = bufferedReader.readLine())!=null){
            list.add(line);
        }
        return list;
    }
    
    }

    2. Semaphore
  • は、プロジェクトで使用するデータベースの接続数など、限られたアクセスしか提供できないリソースによく使用されます.最大20しかないかもしれませんが、外部のコンカレント量は膨大なので、Semaphoreを使用して制御することができます.コンカレント数を1に制御すると、単一スレッドに似ています.
  • 彼は同じ時刻を簡単に制御することができて、ある資源の被のスレッド数に同時アクセスして、使用するのも簡単で、同時制御を行う必要があるコードに対してsemaphore.acquire()とsemaphore.release()を使います;包むと
  • です
    /*
    *     :             --> Semaphore               (      )     
    *   :                    ,              
    *   : Semaphore                  ,        ,               
    *     :
    *          30                   
    *         Semaphore  10         ,            
    * 
    *            
    * */
    public class semaphore {
    public static void main(String[] args) {
        final int tNum =30;
      // ExecutorService executorService;
      //  executorService = new Executors.newFixedThreadPool();
        Semaphore semaphore = new Semaphore(5);//                
    
        for (int i=0;i<100;i++){
            new Thread(()->{
                try {
                    semaphore.acquire();  //       Semaphore      
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"    ");
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                semaphore.release();       //      
    
            }).start();
        }
    }
    }

    3 . CyclicBarrier
  • とCountDownLanchは異なり、パラメータがpartiesであるすべてのスレッドが互いに待機するプロセス構築方法を記述している-barrierを起動する前にawait()のスレッド数
  • を呼び出さなければならない
  • 注意点:入力パラメータが6で、すべてのスレッドが5つしかない場合、プライマリスレッドとサブスレッドは、6番目のスレッドがawaitメソッドを実行していないため、常に待機状態になります.または、await()を実行する前に、スレッドに異常が発生し、バリアが
  • を満たすことはありません.
    インスタンスコード:
     
    /*
    *            ,            ,
    *     ,           ,-->                 -->        
    *
    *     :
    *                                       await()                      
    *                  ,-->           -->     ,  ....
    * */
    
    /*
    *       ,       ,        
    * */
    
    /*
    *     ...
    * */
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrier01 {
    
    public void meeting(CyclicBarrier cyclicBarrier){
    System.out.println(Thread.currentThread().getName()+"     ...");
    
    try {
        cyclicBarrier.await();  //     ..
        System.out.println(Thread.currentThread().getName()+"    ..");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    }
    
    public static void main(String[] args) {
    
        CyclicBarrier01 c = new CyclicBarrier01();
        //      1:
        //        CyclicBarrier,           (  )         ,
        //         barrier          。
        //  :
        //parties -     barrier       await()     
        //  :
        //IllegalArgumentException -    parties    1
        //   :
        //               6,          ,          ,         ,            await  ...
        //               ,      await()  ,    ,           
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
          for (int i =0;i<5;i++) {  //      ...
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      c.meeting(cyclicBarrier);
                  }
              }).start();
          }
    
    
        //    
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("     ,  ...");
    
     //   cyclicBarrier.reset();
    }
    }
    

    Runableを持って、すべての予想されるスレッドがawaitになったら、まずRunableの中のタスクを実行します.
    public class CyclicBarrier02 {
    
    public void meeting(CyclicBarrier cyclicBarrier){
    System.out.println(Thread.currentThread().getName()+"     ...");
    
    try {
        cyclicBarrier.await();  //     ..
        System.out.println(Thread.currentThread().getName()+"     ...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    }
    
    public static void main(String[] args) {
    
        CyclicBarrier02 c = new CyclicBarrier02();
        //      2:
    /*
               CyclicBarrier,           (  )         ,
             barrier           ,           barrier      。
    
          :
        parties -     barrier       await()     
        barrierAction -     barrier       ;         ,      null
          :
    */
    
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new Runnable() {
            @Override
            public void run() {
    
                System.out.println("    ...");
            }
        });
          for (int i =0;i<5;i++) {  //      ...
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      c.meeting(cyclicBarrier);
                  }
              }).start();
          }
        //    
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("     await  ...        ,  ...");
        cyclicBarrier.reset();
    
    }
    }
    

    4.J.U.C同期コンポーネントFutrueTask
    マルチスレッド2の基本スキルについて詳しく説明し、使用します.
    5 J.U.C同期コンポーネントFork/Joinフレームワーク
  • ForkJoinはjava 7が提供する並列実行タスクのフレームワーク
  • である.
  • その設計構想は、1人の大物Forkをいくつかの小さなタスクに分散して計算し、Joinこれらのサブタスクの結果を得て、最終的にこの大きなタスクの結果を得て、あるスレッドを使用して他のスレッドのワークキュー(両端キュー、盗まれたスレッドがこのキューの末尾からタスクを取り、競争を減らす)中盗みタスク実行
  • 制限:
  • 両端タスクキューに1つのタスクがあるとすれば、競争が発生し、スレッドを開くコストが
  • もあるに違いない.
  • はForkとJoinを使用してしか同期できません.他の同期メカニズムが使用されている場合、同期スレッドは他のタスク
  • を盗み取ることはできません.
  • ワークキュー内のタスクはIOオペレーション
  • ではない
  • タスクはチェック異常を投げ出すことができず、必要なコードで
  • を処理する必要があります.