[Javaコンカレントシリーズ]4.Javaコンカレントコンテナとフレームワーク

9160 ワード

Javaコンカレントコンテナ/フレームワークを使用すると、コンカレントプログラムの開発効率が向上し、より優れたコンカレントプログラムを設計できます.Javaコンカレントコンテナ/フレームワークはjava.util.concurrentパッケージにあります.よく使用されるコンカレントコンテナは次のとおりです.
  • ConcurrentHashMap
  • ConcurrentLinkedQueue
  • ブロッキングキュー
  • 1.ConcurrentHashMapの実現原理と使用(1.7に基づく)
    ConcurrentHashMapはスレッドが安全で効率的なHashMapであり、スレッドの安全を保証しながら効率的な操作を保証することができます.
    1.同時環境におけるHashMapの問題
    ①同時環境では、HashMapのput操作後にget操作がデッドサイクルになる可能性があります.HashMapはファスナー法を使用してハッシュ競合を解決し、つまりチェーンテーブルによって配列の同じ位置にハッシュされた値を格納するためです.同時環境でHashMapを使用すると、配列内のチェーンテーブルにループチェーンテーブルが形成される可能性があり、その後のget操作ではe=e.nextを使用してチェーンテーブルに要素があるかどうかを判断し、チェーンテーブルを形成するとe=e.next操作が無限にループし、Infinite Loopのエラーが発生します.
    ②同時環境では、HashMapを操作するputメソッドによって要素が失われる可能性があります.マルチスレッドでputを操作するとaddEntry(hash,key,value,i)が実行され、ハッシュ衝突が発生した場合、2つのスレッドが同じbucketIndexを得て格納され、上書き損失が発生する可能性があります.
    2.同時環境におけるスレッドセキュリティのハッシュ表構造の取得方法
  • CollectionsクラスのsynchronizedMap(Mapm)静的メソッドを使用してスレッドの安全なMapを取得し、このmap上で操作します.
  • Map m = Collections.synchronizedMap(new HashMap());
    
  • は、メソッドまたは呼び出されたメソッドにsynchronizedキーワードを追加することによってスレッドのセキュリティを保証するHashTableを使用する.スレッド競合が激しい場合、HashTableの効率は非常に低下する可能性があります.HashTableでは、スレッドが同じロックを競合しなければならないためです.そのため、1つのスレッドがHashTableの同期メソッドにアクセスすると、他のスレッドはブロックまたはポーリング状態に入るしかありません.
  • はConcurrentHashMapを使用し、ConcurrentHashMapはロックセグメント技術を使用し、Mapのスレッドの安全を保証し、Mapの効率を保証することができる.

  • 3.ConcurrentHashMapの構成
    ConcurrentHashMapはSegment配列構造とHashEntry配列構造からなる.Segmentは再ロック可能であり、配列とチェーンテーブルからなり、ConcurrentHashMapでは1つのSegment配列しかなく、1つのSegmentには1つのHashEntry配列しか含まれず、各HashEntryは1つのチェーンテーブル構造の要素である.HashEntry配列のデータを変更する場合は、まずそれに対応するSegmentロックを取得する必要があります.
    4.ConcurrentHashMapの常用操作ソースコード解読(1.8)
    本機jdk環境は1.8で、1.8のソースコードで分析して、その後1.7と1.8の中の比較の総括があります
    一、put操作
    public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
            //       
            if (key == null || value == null) throw new NullPointerException();
            //  hash,     ,       
            int hash = spread(key.hashCode());
            int binCount = 0;
            //   table         
            for (Node[] tab = table;;) {
                Node f; int n, i, fh;
                //        ,         (    1.7     ,      )
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                //  i       ,     ,      CAS          
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //  CAS    
                    if (casTabAt(tab, i, null,
                                 new Node(hash, key, value, null)))
                        break;        // no lock when adding to empty bin
                }
                //      ,        
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    //        ,     hash  ,         ,            
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {//            
                                binCount = 1;
                                //          
                                for (Node e = f;; ++binCount) {
                                    K ek;
                                    //         key,           value 
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    //     key,            
                                    Node pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {//             
                                Node p;
                                binCount = 2;
                                //       
                                if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                    //           8 ,           
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    put操作フロー:
  • まずkeyに対して2回hash計算を行う.
  • tableが初期化されているかどうかを確認し、初期化されていない場合は、まず初期化を行う.
  • hash競合がなければ、CASを使用して要素を直接挿入します.
  • 拡張が必要であれば、まず拡張操作を行う.
  • Hash衝突が発生した場合、チェーンテーブルまたは赤黒ツリーのノードをロックし、Key/valueをチェーンテーブルの尾に挿入するか、赤黒ツリー構造に従って挿入する.
  • 挿入終了後、tableの位置のチェーンテーブル長が8より大きいか否かを判断する必要があり、8より大きいとチェーンテーブルを赤黒ツリー構造に変換する.
  • 最終統計size;

  • 二、get操作
    
        public V get(Object key) {
            Node[] tab; Node e, p; int n, eh; K ek;
            // key    hash  ,    table    
            int h = spread(key.hashCode());
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {//            
                if ((eh = e.hash) == h) {//        hash  key   hash    ,        equals()  ,   key     key     ,    ,   ;
                   if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                //hash          ,       ForwardingNode find      nextTable    ,
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //         ForwardingNode,      
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    get操作フロー:
  • keyのhash値を計算し、tableのインデックス位置に位置決めし、ヘッダノードであれば直接戻る.
  • 拡張中に遭遇した場合、拡張中であることを示すForwardingNodeのfindメソッドを呼び出し、ノードを検索し、マッチングは直接返される.
  • 上記の状況が一致しない場合、チェーンテーブル上のノードを遍歴し、key.equals(ek)を使用してkey値をマッチングし、マッチングは戻る.

  • 三、size操作
    public int size() {
            long n = sumCount();
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
        }
    
       final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {
            // CounterCell baseCount  
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    5.1.7と1.8のConcurrentHashMapの違い
  • 1.8ではHashEntry+Synchronized+CAS+赤黒ツリー制御を併用し、1.7ではSegment+Reentrantlock+HashEntryを用いて実現する.1.7中錠の粒度はSegmentに基づくが、1.8中錠の粒度はHashEntry(ヘッダノード)に基づくので、1.8中錠の粒度は
  • より低くなる.
  • 1.8ではSynchronizedを使用して同期を行うため、セグメント化する必要がなく、Segmentセグメントのデータ構造を必要とせず、実装の複雑さを低減する.
  • 1.8では、レッドブラックツリーを使用してチェーンテーブルを最適化し、ロングチェーンテーブルを使用して遍歴するのに時間がかかるプロセスですが、レッドブラックツリーに基づいて検索すると、検索速度が大幅に向上します.

  • 2. ConcurrentLinkedQueue
    同時環境では、スレッドの安全なキューを得るには、2つの方法で取得できます.1つはブロックによって取得され、もう1つは非ブロックによって実現されます.ConcurrentLinkedQueueは、ブロックされていない方法で実現されるスレッドの安全なキューです.ConcurrentLinkedQueueは、チェーンテーブルベースの無境界スレッドセキュリティキューです.要素を追加すると、キューの末尾に要素が追加され、要素を取得すると、キューのヘッダから要素が返されます.
    3.Javaのブロックキュー
    ≪キューのブロック|Block Queue|emdw≫:キューがいっぱいになると、キューに要素を挿入するスレッドがブロックされ、キューが満たされないまでブロックされます.キューが空の場合、取得要素のスレッドがブロックされ、キューが空でないまでブロックされます.ブロックキューは、生産者消費者のシーンでよく使用され、生産者はキューに要素を挿入するスレッドであり、消費者はキューから要素を取り出すスレッドである.したがって、ブロックキューは、生産者が要素を格納し、消費者が要素を消費するための容器である.
  • ArrayBlockingQueue配列を用いて実現される境界ブロックキュー
  • LinkedBlockingQueueチェーンテーブルを使用して実装された境界ブロックキューです.デフォルトの構造関数は次のとおりです.
  • //
     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
  • PriorityBlockingQueue優先順位付けをサポートする無境界ブロックキュー
  • DelayQueue優先キューを使用して実現される無境界ブロックキュー
  • SynchronousQueueは要素を格納しないブロックキューであり、各put操作はtake操作を待たなければならない.そうしないと要素を追加できない.
  • LinkedTransferQueueチェーンテーブルを使用して実現される無境界ブロックキュー
  • LinkedBlockingDequeチェーンテーブルからなる双方向ブロックキュー
  • を使用
    ブロックキューの実現原理:通知モードを使用して、生産者がいっぱいのキューに要素を追加すると、ブロックされ、消費者がキューから要素を消費すると、生産者に現在のキューが利用可能であることを通知する.消費者が空のキューで消費しようとすると、ブロックされ、生産者がこのキューで要素を生産すると、このキューが利用可能であることが消費者に通知されます.