JAVA同時プログラミング---ArrayBlockingQueueソースの詳細
33391 ワード
一、概念
ArrayBlockingQueueは、最下位の配列で境界キューを実現し、FIFOのルールに従って要素をソートします.デフォルトではスレッドの公平なアクセスキューは保証されません.
二、ソース分析
基本プロパティ:
方法:
add():
キューの長さを超えずに要素を挿入すると、すぐに実行でき、trueに戻ることに成功します.キューがいっぱいになると異常が放出され、その下位層はofferメソッドを実現し、ブロックされません.テスト:
ソース:
offer(E e):
キューの長さを超えずに要素を挿入すると、すぐにキューの末尾に指定した要素を挿入し、正常にtrueを返し、キューがいっぱいになったらfalseを返します.ブロックされません.
offer(E e, long timeout, TimeUnit unit):
put(E e):
BlockQueueにスペースがない場合、このメソッドを呼び出すスレッドはBlockingQueueの中にスペースがあるまでブロックされ、ブロックされ、割り込みに応答することができます.テスト:
生産ではofferを使用してブロックによるバグを回避することが多い
take()
takeメソッドもキュー内のデータを取得するために使用されますが、pollメソッドとは異なり、現在のスレッドがブロックされ、中断する可能性があります.
poll()
カラムが空でない場合は、キューヘッダを返して削除します.キューが空の場合nullが返されます.非ブロックはすぐに戻ります.
peek()
peekメソッドは本当にキューから要素を削除するのではなく、実際にはヘッダ要素を取り出すだけです.
drainTo(Collection super E> c)
BlockingQueueから利用可能なすべてのデータオブジェクトを一度に取得し(取得データの個数を指定することもできる)、この方法により、取得データの効率を向上させることができる.複数のバッチロックやロックの解除は必要ありません.
ArrayBlockingQueueは、最下位の配列で境界キューを実現し、FIFOのルールに従って要素をソートします.デフォルトではスレッドの公平なアクセスキューは保証されません.
二、ソース分析
基本プロパティ:
/** */
final Object[] items;
/** , take,poll,peek,remove */
int takeIndex;
/** , put,offer,or add */
int putIndex;
/** */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** ReentrantLock */
final ReentrantLock lock;
/** 2 , */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** */
transient Itrs itrs = null;
// fair true
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
方法:
add():
キューの長さを超えずに要素を挿入すると、すぐに実行でき、trueに戻ることに成功します.キューがいっぱいになると異常が放出され、その下位層はofferメソッドを実現し、ブロックされません.テスト:
private static final int CLEAN_QUEUE_MAX_SIZE = 50;
@Test()
public void test1(){
BlockingQueue<Integer> arrayBlockingQueue = Queues.newArrayBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
IntStream.rangeClosed(0,50).forEach( its -> assertThat(arrayBlockingQueue.add(its), equalTo(true)));
}
ソース:
// AbstractQueue add()
public boolean add(E e) {
return super.add(e);
}
// AbstractQueue add offer(e) offer(e) ArrayBlockingQueue
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E e):
キューの長さを超えずに要素を挿入すると、すぐにキューの末尾に指定した要素を挿入し、正常にtrueを返し、キューがいっぱいになったらfalseを返します.ブロックされません.
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// , [0], +1
if (++putIndex == items.length)
putIndex = 0;
count++;
// notEmpty , take
notEmpty.signal();
}
offer(E e, long timeout, TimeUnit unit):
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//
lock.lockInterruptibly();
try {
while (count == items.length) {
// false, ,finally
if (nanos <= 0)
return false;
//
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
put(E e):
BlockQueueにスペースがない場合、このメソッドを呼び出すスレッドはBlockingQueueの中にスペースがあるまでブロックされ、ブロックされ、割り込みに応答することができます.テスト:
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule( () -> {
try {
assertThat(arrayBlockingQueue.take(), equalTo(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
},3, TimeUnit.SECONDS);
executorService.shutdown();
IntStream.rangeClosed(1,51).forEach( its -> {
try {
arrayBlockingQueue.put(its);
System.out.println(its);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// await() , await() InterruptedException ,
// , InterruptedException ,
lock.lockInterruptibly();
try {
while (count == items.length)
// , notFull signal ,
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
生産ではofferを使用してブロックによるバグを回避することが多い
take()
takeメソッドもキュー内のデータを取得するために使用されますが、pollメソッドとは異なり、現在のスレッドがブロックされ、中断する可能性があります.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// ,
notEmpty.await();
// , dequeue
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// null
items[takeIndex] = null;
// ,takeIndex [0], ,
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// notFull
notFull.signal();
return x;
}
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
poll()
カラムが空でない場合は、キューヘッダを返して削除します.キューが空の場合nullが返されます.非ブロックはすぐに戻ります.
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// count 0 null,
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
peek()
peekメソッドは本当にキューから要素を削除するのではなく、実際にはヘッダ要素を取り出すだけです.
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
drainTo(Collection super E> c)
BlockingQueueから利用可能なすべてのデータオブジェクトを一度に取得し(取得データの個数を指定することもできる)、この方法により、取得データの効率を向上させることができる.複数のバッチロックやロックの解除は必要ありません.