ConcurrentLinkedQueueの実現原理分析

5060 ワード

ConcurrentLinkedQueueの実現原理分析
ConcurrentLinkedQueueの紹介
ConcurrentLinkedQueueは、リンクノードに基づく境界のないスレッドセキュリティキューであり、ノードをソートするための先進的なルールを採用しています.要素を追加すると、キューの末尾に追加され、要素を取得すると、キューの先頭の要素に戻ります.このインプリメンテーションは、上述した高効率非ブロックアルゴリズムに基づいて、単純で、迅速で、実用的な非ブロックで、同時キューアルゴリズムをブロックする.
ConcurrentLinkedQueue内部属性および構造方法
private static class Node {
    volatile E item;
    volatile Node next;

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}


private transient volatile Node head;

private transient volatile Node tail;


public ConcurrentLinkedQueue() {
        head = tail = new Node(null);
    }

public ConcurrentLinkedQueue(Collection extends E> c) {
    Node h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node newNode = new Node(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node(null);
    head = h;
    tail = t;
}


インキューオペレーションoffer()
public boolean offer(E e) {
    checkNotNull(e);
    final Node newNode = new Node(e);

    for (Node t = tail, p = t;;) {
        Node q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

ソースコードの観点から見ると、エンキュープロセス全体が主に2つのことをしています.1つ目はエンドノードを特定し、2つ目はCASアルゴリズムを使用してエンキューノードをエンドノードに設定できるnextノードで、成功しなければ再試行します.最初のステップは、末尾ノードを配置します.tailノードは常にエンドノードではないので、エンキューするたびにtailノードを介してエンドノードを見つけなければなりません.エンドノードはtailノードであるか、tailノードのnextノードである可能性があります.コード中のループ体の最初のifはtailにnextノードがあるかどうかを判断し、nextノードがテールノードである可能性があることを示す.tailノードのnextノードを取得するには、pノードがpのnextノードに等しい場合に注意しなければならない.pノードとpのnextノードが空である可能性があるのは、このキューが初期化されたばかりで、最初のノードを追加しようとしているため、headノードに戻る必要があることを示す.
ステップ2では、エンキューノードをエンドノードとして設定します.p.casNext(null,n)メソッドは、エンキューノードを現在のキューの末尾ノードのnextノードに設定するために使用され、pがnullであればpが現在のキューの末尾ノードであり、nullでなければ他のスレッドが末尾ノードを更新したことを示す場合、現在のキューの末尾ノードを再取得する必要がある.
アウトキュー操作poll()
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node h = head, p = h, q;;) {
            E item = p.item;
             //   p        ,  CAS  p        null,       p     。
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

まず、ヘッダノードの要素を取得し、ヘッダノードの要素が空かどうかを判断し、空であれば、別のスレッドが1回のデキュー操作を行ってそのノードの要素を取り出したことを示し、空でなければCASを使用してヘッダノードの参照をnullに設定し、CASが成功すれば、ヘッダノードの要素を直接返し、成功しなければ、別のスレッドが1回のデキュー操作でheadノードを更新し、要素が変化したことを示し、ヘッダノードを再取得する必要があります.