ブロックされていない無境界スレッドセキュリティキュー-ConcurrentLinkedQueue


前言
JUCの次の関連ソースコードを続けて読むと、ブロックされていない境界のないスレッドセキュリティキュー--ConcurrentLinkedQueueが見えます.一緒に見てみましょう.
紹介する
リンクノードの無境界スレッドセキュリティキューに基づいて、要素FIFO(先頭先頭先頭)をソートします.キューのヘッダはキュー内で最も長い時間の要素で、キューの末尾はキュー内で最も短い時間の要素です.キューの末尾に新しい要素を挿入し、キュー検索操作でキューヘッダの要素を取得します.
多くのスレッドが共通セットへのアクセスを共有する場合、ConcurrentLinkedQueueは適切な選択です.他のほとんどの同時集合実装と同様にnull要素は使用できません.
きほんしよう
public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) {

        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

        //              。
        queue.add("liuzhihang");
        //              。
        queue.offer("liuzhihang");

        //            ,       null。
        queue.peek();
        //           ,        null。
        queue.poll();
        
    }
}

ソース分析
きほんこうぞう
パラメータの説明
private static class Node {
    
    //       
    volatile E item;
    //      
    volatile Node next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
    // CAS          
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    //        
    void lazySetNext(Node val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    // CAS           
    boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //    ……
}

ConcurrentLinkedQueueの内部には内部クラスNodeが含まれています.上記のように、この内部クラスはチェーンテーブルのノードを識別するために使用され、コードから、ConcurrentLinkedQueueのチェーンテーブルは であることがわかります.
public class ConcurrentLinkedQueue extends AbstractQueue
        implements Queue, java.io.Serializable {
        
    //     

    //    
    private transient volatile Node head;      

    //    
    private transient volatile Node tail;
}

ヘッダノードはvolatileで修飾され、メモリの可視性を保証します.
コンストラクタ
public ConcurrentLinkedQueue() {
    head = tail = new Node(null);
}

オブジェクトを作成すると、ヘッダとテールノードは空のノードを指します.
要素の追加
public boolean add(E e) {
    return offer(e);
}
public boolean offer(E e) {
    
    //       
    checkNotNull(e);

    //     
    final Node newNode = new Node(e);

    //      
    // t       ,p     t
    for (Node t = tail, p = t;;) {
        // q           
        Node q = p.next;
        if (q == null) {
            //   ,        ,  CAS      
            if (p.casNext(null, newNode)) {
                //    p.next   newNode
                //    p != t      
                if (p != t) 
                    //           tail
                    // q = p.next    q == null     
                    // q     t.next
                    //     tail        
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        //       , poll ,      ,      p == q 
        //        
        else if (p == q)
            // 
            p = (t != (t = tail)) ? t : head;
        else
            //    tail    
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

図面の説明:
  • 単一スレッドの場合:
  • Node q = p.next;に実行すると、現在の状況は図に示すように
  • である.
  • q == nullと判断し、条件を満たすとp.casNext(null, newNode)はCASを用いてp.nextを設定する.
  • の設定に成功した後、p == tは変動しなかったため、プログラムは終了した.
  • マルチスレッドの場合:
  • Node q = p.next;に実行すると、現在の状況は図に示すように
  • である.
  • 複数のスレッドは、CASを使用してp.nextを設定するp.casNext(null, newNode)を実行する.
  • AスレッドCAS設定成功:
  • BスレッドCASの実行に失敗し、再ループし、p = (p != t && t != (t = tail)) ? t : qに実行されます.
  • を再ループすれば正常に設定できます.

  • 要素の取得
    public E poll() {
        restartFromHead:
        //     
        for (;;) {
            for (Node h = head, p = h, q;;) {
                //      iterm
                E item = p.item;
                //          null CAS     null
                if (item != null && p.casItem(item, null)) {
                    // CAS         
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                //           null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                //     ,       
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

    描画プロセスは次のとおりです.
  • 内層ループが実行されると、キューが空の場合:E item = p.item; itermがnullになり、updateHead(h, p)がnullに戻ります.
  • 同時挿入操作が同時にあると仮定し、図に示すように要素を追加する.
    最後のelseが実行されますp = q
  • は、itemの取得をループし続け、p.casItem(item, null)を実行し、p != hと判断し、headを更新してitemに戻る.

  • ここの場合は複雑ですが、ここでは1つだけ挙げて、必要なら自分でいくつか挙げてもいいです.
    要素のコードを表示することと要素のコードを取得することはあまり紹介されていません.
    size操作
    public int size() {
        int count = 0;
        for (Node p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

    CASはロックされていないので、sizeは正確ではありません.さらにsizeはリストを1回巡り、パフォーマンスを消費します.
    まとめ
    ConcurrentLinkedQueueは仕事で使われることが比較的少ないので、関連するソースコードを読むときも大体見て、よく使われるAPI、および下位の原理を理解しただけです.
    簡単にまとめると、一方向チェーンテーブルを使用してキュー要素を保存し、内部ではブロックされていないCASアルゴリズムを使用してロックされていません.したがってsizeの計算は正確ではない可能性があり、同じsizeはチェーンテーブルを遍歴するので、使用することは推奨されません.