Linked Blocking Queソースの分析
概要
Linked BlockingQueチェーンテーブル(シングルチェーンテーブル)をベースに、最初の列は、headは列の中で最も長い要素が存在し、tailは最新の要素であり、要素を追加して、列の最後から追加します.要素を削除して、先頭から削除します.nullの挿入は禁止されていますスレッドは安全で、Rentrant Lockでスレッドの安全を実現します.使い方はアラーブロックQueと一致しています.
AtomicInteger類
AtomicIntegerは、原子操作を提供するIntegerのクラスです.Java言語では、+iとi++の操作はスレッドの安全ではなく、使用時には避けられないsynchronizedのキーワードが使われます.AtomicIntegerはスレッドの安全な加减操作インターフェースを通じています.
public final int get()/現在の値public final int get AndSet//現在の値を取得し、public final int値を設定してAndIncrement()/現在の値を取得し、public final final intからAndefident値を取得し、現在の値を増量してからAndetdededefidededededededetAndfirement/putを取得します.
構造関数
コンストラクタ:デフォルトの容量はInteger.MAXです.固定容量Javaは、右から左に1つずつ値を割り当てられるように指定することもでき、例えば、A=B=C=0は、まずCに0、すなわちC=0を割り当て、その後B=Cとする.最後にA=Bです
add()方法:列がいっぱいになる前に直接追加して、trueに戻ります.そうでないと、キューが異常でいっぱいです.
element():列は空ではなく、キューの要素値を返します.さもないと異常を報告します
poll():列の最後から削除して、列は空で、直接nullに戻ります.空ではなく、削除されたノードの値を返します.
Linked BlockingQueチェーンテーブル(シングルチェーンテーブル)をベースに、最初の列は、headは列の中で最も長い要素が存在し、tailは最新の要素であり、要素を追加して、列の最後から追加します.要素を削除して、先頭から削除します.nullの挿入は禁止されていますスレッドは安全で、Rentrant Lockでスレッドの安全を実現します.使い方はアラーブロックQueと一致しています.
AtomicInteger類
AtomicIntegerは、原子操作を提供するIntegerのクラスです.Java言語では、+iとi++の操作はスレッドの安全ではなく、使用時には避けられないsynchronizedのキーワードが使われます.AtomicIntegerはスレッドの安全な加减操作インターフェースを通じています.
public final int get()/現在の値public final int get AndSet//現在の値を取得し、public final int値を設定してAndIncrement()/現在の値を取得し、public final final intからAndefident値を取得し、現在の値を増量してからAndetdededefidededededededetAndfirement/putを取得します.
構造関数
コンストラクタ:デフォルトの容量はInteger.MAXです.固定容量Javaは、右から左に1つずつ値を割り当てられるように指定することもでき、例えば、A=B=C=0は、まずCに0、すなわちC=0を割り当て、その後B=Cとする.最後にA=Bです
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// head ; last head;(last,head ), head,last null, next null;
last = head = new Node<E>(null);
}
add()、offer()、put()を追加します.add()方法:列がいっぱいになる前に直接追加して、trueに戻ります.そうでないと、キューが異常でいっぱいです.
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer():追加成功するかどうか;キューがいっぱいになりました.追加に失敗しました.falseに戻ります.未満、追加成功、trueに戻ります.public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//
if (count.get() == capacity)
return false;
int c = -1;
//
Node<E> node = new Node<E>(e);
//
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//
if (count.get() < capacity) {
enqueue(node);
// , ; ,+1 ; , await ;
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// await ;
if (c == 0)
signalNotEmpty();
return c >= 0;
}
enqueue():列の最後に元素ノードを追加します.チェーンの端に新しいノードを追加します.Javaは右から左に1つずつ値を割り当てます.例えば、A=B=C=0はまずCに0を割り当てます.即ちC=0、そしてB=Cです.最後にA=Bですprivate void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// last.next node; head,last next node ;
// last last.next( node ); last = node;
// : , , head next ; , last;
last = last.next = node;
}
put():挿入時、列がいっぱいになったら、スレッドがブロックされます.直接挿入しない;public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// ,
while (count.get() == capacity) {
notFull.await();
}
//
enqueue(node);
c = count.getAndIncrement();
// ;
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// ;
if (c == 0)
signalNotEmpty();
}
チームヘッド要素を取得します.element()、peek()element():列は空ではなく、キューの要素値を返します.さもないと異常を報告します
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
peek():列が空で、nullに戻ります.空ではなく、キューヘッドの要素値を返します.public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//head next enqueue() , ;
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
poll()、take()を削除して、remove()poll():列の最後から削除して、列は空で、直接nullに戻ります.空ではなく、削除されたノードの値を返します.
public E poll() {
final AtomicInteger count = this.count;
//
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue():キューヘッドの結点の値を返します.private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// head null,next ; h;
Node<E> h = head;
// ;
Node<E> first = h.next;
h.next = h; // help GC
// head
head = first;
// x;
E x = first.item;
// null, , next ;
first.item = null;
return x;
}
Take():列が空なら、スレッドが詰まっています.空ではなく、キューヘッドの結点に対応する値を直接返します.public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove():キューからオブジェクトを削除します.trueキューはこの要素が存在し、削除に成功しました.falseキューにはこの要素が存在しません.public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
// ,
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
unlink()メソッド://p ;trail p
void unlink(Node<E> p, Node<E> trail) {
// p null;
p.item = null;
//trail p ;
trail.next = p.next;
// p , last=trail;
if (last == p)
last = trail;
// await
if (count.getAndDecrement() == capacity)
notFull.signal();
}
以上がLinked Blocking Queの主要な方法です.問題があれば、よろしくお願いします.