JavaブロッキングキューArrayBlockingQueueの使用と原理分析

8950 ワード

ブロッキングキューとは?
ブロックキュー(BlockingQueue)は、2つの追加操作をサポートするキューです.この2つの追加操作は、キューが空の場合、要素を取得するスレッドがキューが空になるのを待つことです.キューがいっぱいになると、要素を格納するスレッドがキューが使用可能になるのを待ちます.キューをブロックするのは、生産者や消費者のシーンでよく使われます.生産者はキューに要素を追加するスレッドで、消費者はキューから列に要素を持つスレッド.ブロックキューは生産者が元素を保管する容器であり、消費者も容器から元素だけを受け取る.
方法
異常を投げ出す
特殊値を返す
ずっと塞いで
タイムアウト終了
挿入
add(e)
offer(e)
put(e)
offer(e, time, unit)
削除
remove()
poll()
take()
poll(time, unit)
けんさ
element()
peek()
使用不可
使用不可
  • 放出異常:ブロックされたキューがいっぱいになったときにキューに要素を挿入すると、IllegalStateException(「Queue full」)異常が放出されます.キューが空の場合、キューから要素を取得すると、NoSuchElementException例外が放出されます.
  • は、特殊な値を返します.挿入メソッドは成功を返し、成功するとtrueを返します.除去方法は、キューから要素を取り出し、そうでなければnull
  • を返す.
  • は常にブロックされています.ブロックされたキューがいっぱいになると、生産者スレッドがキューにput要素を入力すると、キューはデータを取得するまで生産者スレッドをブロックしたり、割り込みに応答して終了したりします.キューが空の場合、消費者スレッドはキューからtake要素を試み、キューが使用可能になるまで消費者スレッドをブロックします.
  • タイムアウト終了:ブロックキューがいっぱいになると、キューは生産者スレッドをブロックし、一定の時間を超えると、生産者スレッドは終了します.

  • コアメンバー変数
        // ArrayBlockingQueue           
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    

    コンストラクション関数
    //           ,       
    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
    }
    
    //       、       
    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //      
            lock = new ReentrantLock(fair);
            //         
            notEmpty = lock.newCondition();
            //         
            notFull =  lock.newCondition();
    }

    チームに入る
  • offer.trueが正常に返され、false
  • が失敗しました.
    public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //      
            lock.lock();
            try {
                //     ,    false
                if (count == items.length)
                    return false;
                else {
                //     
                    enqueue(e);
                    return true;
                }
            } finally {
                //    
                lock.unlock();
            }
    }
    
  • put:キューがいっぱいで、ストレージ要素を呼び出すスレッド
  • がブロックされます.
    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await(); //     ,         
                enqueue(e);
            } finally {
                lock.unlock();
            }
    }
  • enqueue:エンキュー操作
  • private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            //        。           。
            notEmpty.signal();
    }
  • poll:取得要素、存在戻り要素e、存在戻りnull
  •     public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
  • take:要素を取ります.キューが空の場合、取得要素を呼び出すスレッドがブロックされます.
  •     public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
  • dequeue:デキュー操作
  • private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            //          ,               
            notFull.signal();
            return x;
    }

    最後に書く
           JDK8  。