Javaソース分析-ARayBlockingQueue
3748 ワード
ArrayBlockingQueueはJDK 1.5 concurrentパッケージで提供されるコンカレントツールクラスは、配列ベースの境界のある先進的な先頭キューであり、要素を追加すると配列がいっぱいになったり、配列が空になったりすると、要素を取ると現在のスレッドがブロックされたりする典型的な生産者消費者モデルです.類似のクラスにはLinkedBlockingQueue,PriorityBlockingQueueがある.
関係を継承
関係を継承 public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable
public interface BlockingQueue extends Queue
コアメンバー変数 final Object[] items; //
/** items index for next take, poll, peek or remove */
int takeIndex; //
/** items index for next put, offer, or add */
int putIndex; //
/** Number of elements in the queue */
int count; //
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; //
/** Condition for waiting takes */
private final Condition notEmpty; //
/** Condition for waiting puts */
private final Condition notFull; //
構築方法 public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //
lock = new ReentrantLock(fair); //fair
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
要素の追加:putメソッド public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // , , , notFull , notFull signal
enqueue(e); //
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //
if (++putIndex == items.length) // putIndex 1, , 0
putIndex = 0;
count++; //
notEmpty.signal(); // , noEmpty , 。
}
putメソッドはまずキューが満たされているかどうかを判断し,満たされている場合は現在のスレッドをブロックし,notFullという条件変数に待機し,notFull呼び出しsignal起動を待つ.キューが空でない場合は、その要素をキューの最後に追加し、要素のカウントを増やし、削除時に配列が空であるためnotEmptyに待機しているスレッドを起動し、キューに要素がすでに存在していることを通知し、取得操作を行うことができます.
要素をとる:takeメソッドtakeメソッドtakeメソッド public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // , , notEmpty
return dequeue(); // , ,
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // ,
return x; //
}
takeメソッドはputメソッドと同様に、キューが空であるかどうかを確認し、キューが空である場合、スレッドをブロックし、notEmptyで待機します.キューが空でない場合は、デキュー操作を行います.要素を追加したが、キューがいっぱいになったときにブロックされたスレッドにも通知します.キューは挿入できます.
オリジナルをサポートします.転載は出典を明記してください.github:https://github.com/gatsbydhn
public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable
public interface BlockingQueue extends Queue
final Object[] items; //
/** items index for next take, poll, peek or remove */
int takeIndex; //
/** items index for next put, offer, or add */
int putIndex; //
/** Number of elements in the queue */
int count; //
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; //
/** Condition for waiting takes */
private final Condition notEmpty; //
/** Condition for waiting puts */
private final Condition notFull; //
構築方法 public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //
lock = new ReentrantLock(fair); //fair
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
要素の追加:putメソッド public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // , , , notFull , notFull signal
enqueue(e); //
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //
if (++putIndex == items.length) // putIndex 1, , 0
putIndex = 0;
count++; //
notEmpty.signal(); // , noEmpty , 。
}
putメソッドはまずキューが満たされているかどうかを判断し,満たされている場合は現在のスレッドをブロックし,notFullという条件変数に待機し,notFull呼び出しsignal起動を待つ.キューが空でない場合は、その要素をキューの最後に追加し、要素のカウントを増やし、削除時に配列が空であるためnotEmptyに待機しているスレッドを起動し、キューに要素がすでに存在していることを通知し、取得操作を行うことができます.
要素をとる:takeメソッドtakeメソッドtakeメソッド public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // , , notEmpty
return dequeue(); // , ,
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // ,
return x; //
}
takeメソッドはputメソッドと同様に、キューが空であるかどうかを確認し、キューが空である場合、スレッドをブロックし、notEmptyで待機します.キューが空でない場合は、デキュー操作を行います.要素を追加したが、キューがいっぱいになったときにブロックされたスレッドにも通知します.キューは挿入できます.
オリジナルをサポートします.転載は出典を明記してください.github:https://github.com/gatsbydhn
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //
lock = new ReentrantLock(fair); //fair
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // , , , notFull , notFull signal
enqueue(e); //
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //
if (++putIndex == items.length) // putIndex 1, , 0
putIndex = 0;
count++; //
notEmpty.signal(); // , noEmpty , 。
}
putメソッドはまずキューが満たされているかどうかを判断し,満たされている場合は現在のスレッドをブロックし,notFullという条件変数に待機し,notFull呼び出しsignal起動を待つ.キューが空でない場合は、その要素をキューの最後に追加し、要素のカウントを増やし、削除時に配列が空であるためnotEmptyに待機しているスレッドを起動し、キューに要素がすでに存在していることを通知し、取得操作を行うことができます.
要素をとる:takeメソッドtakeメソッドtakeメソッド public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // , , notEmpty
return dequeue(); // , ,
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // ,
return x; //
}
takeメソッドはputメソッドと同様に、キューが空であるかどうかを確認し、キューが空である場合、スレッドをブロックし、notEmptyで待機します.キューが空でない場合は、デキュー操作を行います.要素を追加したが、キューがいっぱいになったときにブロックされたスレッドにも通知します.キューは挿入できます.
オリジナルをサポートします.転載は出典を明記してください.github:https://github.com/gatsbydhn
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // , , notEmpty
return dequeue(); // , ,
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // ,
return x; //
}