コンカレントリンク
9060 ワード
ConccurrentlinkedQueはQueの安全な実現です.Queの中の元素はFIFOの原則によって並べ替えられます.CAS操作を採用して、元素の整合性を保証します.
データ構造は:一方向リンク表.
変数はvolatileを使って修正して、内部の視認性を保証します.変数に対する書き込み操作は後続の読み操作に対して見られます.同様にCPU指令の並び替えにもつながりません.
1.末尾結点tを取得し、p=tを設定し、qはtの次の結点として、qが空かどうかを判断します.空であれば、最後の結点として説明し、pのnext nodeをCASで更新します.
成功すれば、trueに戻ります.失敗したらnext nodeが更新されたと説明したら、実行サイクルは1から実行します.成功すれば2.
2.pとqが同一のオブジェクトかどうかを判断し、CAS更新テールを使用しないと(併発時に発生する可能性があります)、更新失敗の説明にテールがあります.成功に失敗してもtrueに戻ります.
3.もしqがnullでなく、p=qであるなら、tailが変化していないなら、jump to head、再検査して、引き続き1を実行する.
4.qがnullでなく、p<>qである場合、つまり、ノードの後ろに要素があるということは、ノードを後ろに移動して、1を実行し続ける必要がある.
1.p=head、p.itemが空いていない場合、CASを通じてp.itemをnullに更新し、更新が成功すればitemに戻ります.
2.p.itemが空であれば、p.nextが空かどうかを判断し、もし空であればheadをpと更新する.
3.p.nextが空でないなら、p=q(p.next)を再実行します.
4.p.nextが空でない場合、p=qが1の更新に失敗したと説明したら、再度1を実行します.
なお、テールノードのテールの動作は、一時変数tとsに変更される必要があり、一方でvolatile変数の可変性を除去するためであり、他方では、volatileの性能影響を低減するためである.
データ構造は:一方向リンク表.
変数はvolatileを使って修正して、内部の視認性を保証します.変数に対する書き込み操作は後続の読み操作に対して見られます.同様にCPU指令の並び替えにもつながりません.
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/***
* Sets the value of the object field at the specified offset in the
* supplied object to the given value, with volatile store semantics.
* obj offset object field 。 volatile store
*
* @param obj the object containing the field to modify.
* field
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code> object field
* @param value the new value of the field.
* field
* @see #putObject(Object,long,Object)
*/
public native void putObjectVolatile(Object obj, long offset, Object value);
/***
* Compares the value of the object field at the specified offset
* in the supplied object with the given expected value, and updates
* it if they match. The operation of this method should be atomic,
* thus providing an uninterruptible way of updating an object field.
* obj offset object field , 。
* , object field。
*
* @param obj the object containing the field to modify.
* field
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code> object field
* @param expect the expected value of the field.
* field
* @param update the new value of the field if it equals <code>expect</code>.
* expect field , filed
* @return true if the field was changed.
* field
*/
public native boolean compareAndSwapObject(Object obj, long offset,
Object expect, Object update);
/***
* Sets the value of the object field at the specified offset in the
* supplied object to the given value. This is an ordered or lazy
* version of <code>putObjectVolatile(Object,long,Object)</code>, which
* doesn't guarantee the immediate visibility of the change to other
* threads. It is only really useful where the object field is
* <code>volatile</code>, and is thus expected to change unexpectedly.
* obj offset object field 。
* <code>putObjectVolatile</cdoe> ,
* 。 field <code>volatile</code>
* 。
*
* @param obj the object containing the field to modify.
* field
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code> long field
* @param value the new value of the field.
* field
*/
public native void putOrderedObject(Object obj, long offset, Object value);
構造headとtailは全部空のNodeです.
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
AbstractQue、Queだけ継承していますので、add(e)、remove、offer(e)、poll.主要な方法があります.
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
Queueに全部入れます.1.末尾結点tを取得し、p=tを設定し、qはtの次の結点として、qが空かどうかを判断します.空であれば、最後の結点として説明し、pのnext nodeをCASで更新します.
成功すれば、trueに戻ります.失敗したらnext nodeが更新されたと説明したら、実行サイクルは1から実行します.成功すれば2.
2.pとqが同一のオブジェクトかどうかを判断し、CAS更新テールを使用しないと(併発時に発生する可能性があります)、更新失敗の説明にテールがあります.成功に失敗してもtrueに戻ります.
3.もしqがnullでなく、p=qであるなら、tailが変化していないなら、jump to head、再検査して、引き続き1を実行する.
4.qがnullでなく、p<>qである場合、つまり、ノードの後ろに要素があるということは、ノードを後ろに移動して、1を実行し続ける必要がある.
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
Queueからitemを取る全過程.1.p=head、p.itemが空いていない場合、CASを通じてp.itemをnullに更新し、更新が成功すればitemに戻ります.
2.p.itemが空であれば、p.nextが空かどうかを判断し、もし空であればheadをpと更新する.
3.p.nextが空でないなら、p=q(p.next)を再実行します.
4.p.nextが空でない場合、p=qが1の更新に失敗したと説明したら、再度1を実行します.
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
キューサイズを取得するプロセスは、カウンタがないため、キューのサイズをカウントします.キューのサイズは最初から最後まで完全にキューを巡回するしかないです.ですから、通常はコンカレントリンクQueとAtomicIntegerを組み合わせてキューサイズを取得する必要があります.なお、テールノードのテールの動作は、一時変数tとsに変更される必要があり、一方でvolatile変数の可変性を除去するためであり、他方では、volatileの性能影響を低減するためである.