JAVA同時プログラミング---ArrayBlockingQueueソースの詳細


一、概念
ArrayBlockingQueueは、最下位の配列で境界キューを実現し、FIFOのルールに従って要素をソートします.デフォルトではスレッドの公平なアクセスキューは保証されません.
二、ソース分析
基本プロパティ:
 /**        */
 final Object[] items;
 /**            ,  take,poll,peek,remove    */
 int takeIndex;
 /**            ,     put,offer,or add     */
 int putIndex;
 /**           */
 int count;
 /*
  * Concurrency control uses the classic two-condition algorithm
  * found in any textbook.
  */
 /** ReentrantLock         */
 final ReentrantLock lock;
 /**  2     ,                  */
 private final Condition notEmpty;
 /** Condition for waiting puts */
 private final Condition notFull;
 /**       */
 transient Itrs itrs = null;
//    fair true      
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
 	

方法:
add():
キューの長さを超えずに要素を挿入すると、すぐに実行でき、trueに戻ることに成功します.キューがいっぱいになると異常が放出され、その下位層はofferメソッドを実現し、ブロックされません.テスト:
private static final int CLEAN_QUEUE_MAX_SIZE = 50;
  @Test()
    public  void test1(){
        BlockingQueue<Integer> arrayBlockingQueue = Queues.newArrayBlockingQueue(CLEAN_QUEUE_MAX_SIZE); 
        IntStream.rangeClosed(0,50).forEach( its -> assertThat(arrayBlockingQueue.add(its), equalTo(true)));
    }

ソース:
//      AbstractQueue add()  
 public boolean add(E e) {
        return super.add(e);
    }
 
 // AbstractQueue  add      offer(e) offer(e)     ArrayBlockingQueue 
 public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
 }

offer(E e):
キューの長さを超えずに要素を挿入すると、すぐにキューの末尾に指定した要素を挿入し、正常にtrueを返し、キューがいっぱいになったらfalseを返します.ブロックされません.
private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
 
 public boolean offer(E e) {
     checkNotNull(e);
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         if (count == items.length)
             return false;
         else {
             enqueue(e);
             return true;
         }
     } finally {
         lock.unlock();
     }
 }

 //              
 private void enqueue(E x) {
     // assert lock.getHoldCount() == 1;
     // assert items[putIndex] == null;
     final Object[] items = this.items;
     items[putIndex] = x;
     //                  ,    [0],    +1
     if (++putIndex == items.length)
         putIndex = 0;
     count++;
      //    notEmpty        ,       take     
     notEmpty.signal();
 }

offer(E e, long timeout, TimeUnit unit):
 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //       
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
             //                     false,      ,finally     
                if (nanos <= 0)
                    return false;
                 //        
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

put(E e):
BlockQueueにスペースがない場合、このメソッドを呼び出すスレッドはBlockingQueueの中にスペースがあるまでブロックされ、ブロックされ、割り込みに応答することができます.テスト:
 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
 executorService.schedule( () -> {
     try {
         assertThat(arrayBlockingQueue.take(), equalTo(1));
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 },3, TimeUnit.SECONDS);
 executorService.shutdown();
 IntStream.rangeClosed(1,51).forEach( its -> {
     try {
         arrayBlockingQueue.put(its);
         System.out.println(its);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 });
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
     final ReentrantLock lock = this.lock;
     //             await()  , await()             InterruptedException     ,        
     //                ,         InterruptedException  ,         
     lock.lockInterruptibly();
     try {
         while (count == items.length)
         	//            ,  notFull signal     ,          
             notFull.await();
         enqueue(e);
     } finally {
         lock.unlock();
     }
 }

生産ではofferを使用してブロックによるバグを回避することが多い
take()
takeメソッドもキュー内のデータを取得するために使用されますが、pollメソッドとは異なり、現在のスレッドがブロックされ、中断する可能性があります.
 public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         while (count == 0)
          //       ,        
             notEmpty.await();
          //         ,  dequeue       
         return dequeue();
     } finally {
         lock.unlock();
     }
 }
 private E dequeue() {
     // assert lock.getHoldCount() == 1;
     // assert items[takeIndex] != null;
     final Object[] items = this.items;
     @SuppressWarnings("unchecked")
     E x = (E) items[takeIndex];
     //              null    
     items[takeIndex] = null;
     //          ,takeIndex   [0],            ,         
     if (++takeIndex == items.length)
         takeIndex = 0;
     count--;
     if (itrs != null)
         itrs.elementDequeued();
     //    notFull            
     notFull.signal();
     return x;
 }

 void elementDequeued() {
     // assert lock.getHoldCount() == 1;
     if (count == 0)
         queueIsEmpty();
     else if (takeIndex == 0)
         takeIndexWrapped();
 }

poll()
カラムが空でない場合は、キューヘッダを返して削除します.キューが空の場合nullが返されます.非ブロックはすぐに戻ります.
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	// count 0  null,            
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

peek()
peekメソッドは本当にキューから要素を削除するのではなく、実際にはヘッダ要素を取り出すだけです.
 public E peek() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         return itemAt(takeIndex); // null when queue is empty
     } finally {
         lock.unlock();
     }
 }
 final E itemAt(int i) {
    return (E) items[i];
}

drainTo(Collection super E> c)
BlockingQueueから利用可能なすべてのデータオブジェクトを一度に取得し(取得データの個数を指定することもできる)、この方法により、取得データの効率を向上させることができる.複数のバッチロックやロックの解除は必要ありません.