ブロックキューソースの概要


文書ディレクトリ
  • 概要
  • ソースコード
  • 問題
  • を使用
  • 候補
  • 参照
  • 概要
  • キューをブロックする利点:キューがいっぱいになると、コードでキューへのデータの格納を禁止する必要がなくなり、キューが空でない場合、消費者の消費も禁止され、使用に便利性と安全性を提供します.
  • ブロックキューの種類:
  • ArrayBlockingQueue:配列構造からなる境界ブロックキュー.
  • LinkedBlockingQueue:チェーンテーブル構造からなる境界ブロックキュー.
  • PriorityBlockingQueue:優先順位付けをサポートする無境界ブロックキュー.
  • DelayQueue:優先キューを使用して実現される無境界ブロックキュー.
  • SynchronousQueue:要素を格納しないブロックキュー.
  • LinkedTransferQueue:チェーンテーブル構造からなる無境界ブロックキュー
  • LinkedBlockingDeque:チェーンテーブル構造からなる双方向ブロックキュー.

  • のいくつかの方法:
  • 増加(キューの最後にのみ要素を追加):add、put、offer
  • キューヘッダ(キューヘッダでのみ要素を除去):peek、poll、take
  • 削除:remove(キューで要素を削除)、remove(object o)(キュー内の要素を削除)

  • ソースコード
  • この節では、主にListタイプのブロックキュー
  • について説明する.
  • キューとコレクション->Array構造
  • 同じ点:いずれも線形構造であり、下位実装Object配列
  • の違い:
  • セットの各位置は要素を追加して削除することができ、キューは一端に挿入して一端に削除するしかなく、先進的な先出しの線形構造であり、キューは先進的な先出しであるため、キューの尾に挿入し、キューの頭部に除去する.
  • コレクションからエレメントを除去すると、一部の場所のエレメントindexが変化する可能性がありますが、キュー内のエレメントの一端での除去と増加は、他の場所のindexに影響しません.つまり、コレクションからエレメントを除去すると、データが移動する可能性があります.例えばList[1,2,3]、要素2を除去した結果List[1,3]、要素3のindexが変化する.Deque[1,2,3],1つ(1のセグメントからしか除去できない要素)Deque[null,2,3]を除去し,要素中のindexは変化しなかった.
  • の集合とキュー削除のある要素の実現原理はほぼ同じであり、remove(object)は、配列中の要素を移動する必要がある.


  • 実装原理構想:要素を格納する構造はObject配列であり、キューの実装配列では、キューは循環する配列であり、データを挿入したり削除したりする際のインデックスと配列の長さの数値が同時に、インデックスを初期値にリセットする必要がある.
  • 概要:一端を挿入して削除し、限られた空間内で次の挿入データの位置と削除データの位置を知るために、キューの実装では挿入要素の位置と除去要素の位置を記録する必要がある.キューに空と満がある場合もあるので、キューに要素が存在する個数も知る必要があります.
  • 追加実装:
  • 現在の要素をキューの末尾に挿入します.キューの末尾は配列内の任意の位置であることができます.
  • キューから要素を取得する位置は挿入位置とは正反対であり、キューから要素がキューのヘッダに除去され、要素がキューの末尾に増加するため、キューは循環構造であり、配列がいっぱいになると、キュー内の要素が消費されるのを待つ必要があり、消費される要素はキューのヘッダのみであり、したがって、キューがいっぱいになると、挿入位置のインデックスをキューのヘッダ
  • に設定する必要がある.
  • 削除の実装:ヘッダインデックスのデータを削除し、キューが空の場合、待機し、データを削除すると、キューの数は
  • 減少する.
  • remove(Object):キュー内でエレメントを検索し、見つかった場合はエレメントを削除し、その後のエレメントを移動します.見つからない場合は削除しません.

  • グローバル変数
  • final nal Object[] items;
    int takeIndex;//             
    int putIndex;//              
    int count;
    final ReentrantLock lock;
     /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts          ,                ,
    *       ,         ,     (   +CPU            )
    */
    private final Condition notFull;
    transient Itrs itrs = null;  //   ,          ,        ,                 
    
  • take pullメソッド
  • public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();//lock    ,   A  take        ,    A       ,     
            try {
                while (count == 0)//       ,          
                    notEmpty.await();
                return dequeue(); //    
            } finally {
                lock.unlock();
            }
    }
    
    public E poll() {//remove()     -> remove() remove(Object o  )
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             return (count == 0) ? null : dequeue();
         } finally {
             lock.unlock();
         }
     }
    
    private E dequeue() {
            final Object[] items = this.items
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];//        
            items[takeIndex] = null;//            null
            if (++takeIndex == items.length)//         ,               ,                
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();//             
            return x;
    }
    
    
  • offer put
  • 
    public boolean offer(E e) {//add     
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    public void put(E e) throws InterruptedException {
           checkNotNull(e);
           final ReentrantLock lock = this.lock;
           lock.lockInterruptibly();
           try {
               while (count == items.length)//     ,          
                   notFull.await();
               enqueue(e);
           } finally {
               lock.unlock();
           }
    }
    
    private void enqueue(E x) {
          // assert lock.getHoldCount() == 1;
          // assert items[putIndex] == null;
          final Object[] items = this.items;
          items[putIndex] = x;//    
          if (++putIndex == items.length)//               ,             
              putIndex = 0;
          count++;
          notEmpty.signal();//       
    }
    
    
  • remove(object)
  • public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {//       
                    final int putIndex = this.putIndex;/
                    int i = takeIndex;
                    //                      ,    ,        
                    do {
                        if (o.equals(items[i])) { //                 ,
                            removeAt(i);//        
                            return true;
                        }
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
    }
    
    void removeAt(final int removeIndex) {
    
           final Object[] items = this.items;
           if (removeIndex == takeIndex) { //            
               // removing front item; just advance
               items[takeIndex] = null;
               if (++takeIndex == items.length)
                   takeIndex = 0;
               count--;
               if (itrs != null)
                   itrs.elementDequeued();
           } else {
               //         ,      ,        takeIndex putIndex
               final int putIndex = this.putIndex;
               for (int i = removeIndex;;) {
                   int next = i + 1;
                   if (next == items.length)//index          
                       next = 0;
                   if (next != putIndex) {//    
                       items[i] = items[next];
                       i = next;
                   } else {//      
                       items[i] = null;
                       this.putIndex = i;
                       break;
                   }
               }
               count--;
               if (itrs != null)
                   itrs.removedAt(removeIndex);
           }
           notFull.signal();
       }
    
    

    に質問
  • ブロッキングキューは、なぜロックを追加して追加および削除操作を実現するのですか?
  • スレッドセキュリティの問題が発生し、同じ位置に複数の生産者が製品を生産する場合に代替操作が発生し、同じ位置に複数の消費がある場合に重複消費の問題が発生する
  • .
  • ArrayDequeとwait、notify、notifyAllによってカスタマイズされたブロックキューが存在する場合、複数の生産者(p 1,p 2,p 3)と複数の消費者(c 1,c 2,c 3)が存在し、ロック対象はそのArrayDequeキューであり、ある時点でp 1が製品を生産した後、現在のキューがいっぱいになった場合、notifyメソッドを呼び出すとそのスレッドが呼び出されますか?もし生産者を呼び覚ますならば、生産者は製品を消費して、もし消費者を呼び覚ますならば?もし消費者の数が多いならば、notifyは多くの消費者を呼び覚まして1人の消費者を呼び覚まさないのではないでしょうか.notifyは、そのスレッドを起動すべきことを指定していません.
  • notify起動スレッドは、どちらを指すのではなく、ブロック内のすべてのスレッドであり、どちらを指定することもできません.

  • ブロックキューで反復器を実現し、要素を除去する際に反復器の中の要素を操作し、リストとArrayBlockingQueueの反復器実現を時間を見つけてまとめるつもりです.if (itrs != null) itrs.removedAt(removeIndex);のセグメントは、反復器の要素を操作します.

  • 使用
    public class Test {
        private int queueSize = 10;
        private ArrayBlockingQueue queue = new ArrayBlockingQueue(queueSize);
         
        public static void main(String[] args)  {
            Test test = new Test();
            Producer producer = test.new Producer();
            Consumer consumer = test.new Consumer();
             
            producer.start();
            consumer.start();
        }
         
        class Consumer extends Thread{
             
            @Override
            public void run() {
                consume();
            }
             
            private void consume() {
                while(true){
                    try {
                        queue.take();
                        System.out.println("         ,    "+queue.size()+"   ");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
         
        class Producer extends Thread{
             
            @Override
            public void run() {
                produce();
            }
             
            private void produce() {
                while(true){
                    try {
                        queue.put(1);
                        System.out.println("           ,      :"+(queueSize-queue.size()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    候補者
    wait、notify、await、signalの思考:
  • wait,awaitは,ロックを取得したスレッドをブロック状態にしながらロックを解放し,cpuリソースを解放し,その後のスレッドは起動されなければ使用できない.
  • notify,signalは起動待ちスレッドの1つのスレッドであり、あるスレッドは、実際にはすべての待機スレッドがキューに与えるように形成されているので、起動スレッドがキューから取得する、すなわち秩序化されており、公平ロックと非公平ロックは主にキューを形成する際の違い
  • である.
  • よく観察すると、ブロックされたキューには2つのロックが存在し、キューの一方が入って、一方が出ているので、それぞれ出入り時に2つのロックがあります.この2つのロックの目的は、スレッドを起動するときに指向していることです.キューに入ると、スレッドがいっぱいで、入ることができません.キューを起動するスレッドは、キューの要素を消費します.逆にキューからキューに通知する生産要素~
  • .
    リファレンス
    https://www.cnblogs.com/dolphin0520/p/3932906.html https://blog.csdn.net/u013991521/article/details/53068713