Java 8のConcurrentHashMapがスレッドのセキュリティをどのように保証しているかを解読します.
43063 ワード
HashMapは、作業中に使用頻度が非常に高いK-V格納容器です.マルチスレッド環境ではHashMapの使用は安全ではなく,種々の望ましくない結果を生じる可能性がある.
HashMapスレッドセキュリティの問題については、筆者のもう一つの文章を参考にしてください.HashMapスレッドセキュリティの問題を深く解読します.
HashMapがマルチスレッド環境で安全でないという問題に対して,HashMapの著者らは,これはバグではなく,スレッド安全なHashMapを使用すべきであると考えている.
スレッドの安全なHashMapを得るには、次の方法があります. Collections.synchronizedMap HashTable ConcurrentHashMap
このうち,前の2つの方式はグローバルロックの問題で深刻な性能問題があった.そのため、有名なコンカレントプログラミングの巨匠Doug LeaはJDK 1.5のjava.util.concurrentパッケージの下にコンカレントツールをたくさん追加しました.ここにはConcurrentHashMapというスレッドの安全なHashMapが含まれています.
本稿では,ConcurrentHashMapの実現原理を簡単に紹介する.
PS:JDK 8ベース
0 ConcurrentHashMapのJDK 7でのレビュー
ConcurrentHashMapはJDK 7とJDK 8で実装方式が大きく異なる.まず、JDK 7におけるConcurrentHashMapの原理を概説してみましょう.
0.1セグメントロックテクノロジ
HashTableがhashテーブル全体をロックするという問題に対して,ConcurrentHashMapはセグメントロックの解決策を提案した.
セグメントロックの考え方は、ロック時にhashテーブル全体をロックするのではなく、一部だけをロックすることです.
どのように実現しますか?これはConcurrentHashMapで最も重要なSegmentに使用されます.
ConcurrentHashMapには1つのSegment配列が維持されており、各SegmentはHashMapと見なすことができる.
Segment自体はReentrantLockを継承しており、それ自体がロックです.
Segmentでは、HashEntry配列によって内部のhashテーブルが維持されます.
各HashEntryはmapのK-Vを表し、HashEntryでチェーンテーブル構造を構成し、nextフィールドで次の要素を参照することができます.
上記の内容はソースコードに以下のように表示されます.
従って、JDK 7では、ConcurrentHashMapの全体構成を下図のように記述することができる.
上図から,我々のhash値が十分に分散している限り,putのたびにputが異なるsegmentに移行することが分かる.一方、segment自体はロックであり、putの場合、現在のsegmentは自分をロックします.この場合、他のスレッドはこのsegmentを操作できませんが、他のsegmentの操作には影響しません.これがロックセグメントがもたらすメリットです.
0.2スレッドの安全なput
ConcurrentHashMapのputメソッドのソースコードは次のとおりです.
最終的にsegmentのputメソッドが呼び出され、要素putがHashEntry配列に入力されます.ここでのコメントではロックに関する説明のみが表示されます.
0.3スレッドの安全な拡張(Rehash)
HashMapのスレッドセキュリティの問題の大部分は拡張(rehash)の過程にある.
ConcurrentHashMapの拡張は、各segmentのHashEntry配列に対してのみ拡張される.
上記putのソースコードから分かるように、ConcurrentHashMapはrehashの際にロックされているため、rehashの過程で他のスレッドがsegmentのhashテーブルを操作できないため、スレッドの安全が保証される.
1 JDK 8におけるConcurrentHashMapの初期化
無パラメータコンストラクション関数を例に、ConcurrentHashMapクラスが初期化されたときに何をするかを見てみましょう.
まず、静的コードブロックと初期化クラス変数が実行されます.主に次のクラス変数を初期化します.
ここではUnsafeクラスが使用され、objectFieldOffsetメソッドは、sizeCtlなどの指定されたFieldのメモリ内のオフセット量を取得するために使用されます.
取得したこのオフセット量は主に何に使いますか?焦らずに、以下の分析で、出会ったときに検討すればいいです.
PS:Unsafeの紹介と使用について、筆者のもう一つの文章Unsafe類の紹介と使用を見ることができる.
2内部データ構造
まず、JDK 8でどのように定義されているストレージ構造をソースコードの観点から見てみましょう.
JDK 8とJDK 7の実装は大きく異なり,JDK 8ではSegmentの概念は使用されず,HashMapの実装方式に似ていることが分かった.
PS:HashMapの原理については、筆者のもう一つの文章HashMapの原理及び内部記憶構造を参考にすることができる
この構造は下図で説明できる
3スレッドの安全なhashテーブルの初期化
ConcurrentHashMapはtableというメンバー変数を用いてhashテーブルを持つことが前述から分かる.
tableの初期化には遅延初期化ポリシーが採用されており、putを初めて実行するときにtableを初期化します.
putメソッドのソースコードは以下の通りです(一時的に関連しないコードは省略します):
initTableソースコードは以下の通りです.
メンバー変数sizeCtlのConcurrentHashMapの1つの役割はHashMapのthresholdに相当し、hashテーブルの要素の個数がsizeCtlを超えると、拡張がトリガーされる.彼のもう一つの役割は、例えば、彼が-1に等しい場合、hashテーブルの初期化を実行しているスレッドがあることを示し、-1未満の値は、あるスレッドがhashテーブルに対してresizeを実行していることを示す.
この方法はまずsizeCtlが0未満であるか否かを判断し,0未満であれば現在のスレッドを直接準備状態のスレッドに変える.
sizeCtlが0以上の場合、現在のスレッドはCASでsizeCtlの値を-1に変更しようとします.修正に失敗したスレッドは次のループに入り、sizeCtl<0と判断し、yieldに住まれる.正常なスレッドを変更すると、次の初期化コードが実行されます.
new Node[]の前に、tableが空であるかどうかをもう一度チェックします.ここで二重チェックをするのは、別のスレッドが#1コードを実行した後に保留されている場合、別の初期化されたスレッドが#6のコードを実行した場合、sizeCtlが0より大きい値である場合、このスレッドを再カットして実行する場合、初期化を繰り返す可能性があるからです.この問題については下図の同時シーンで説明します.
次にhashテーブルを初期化し、sizeCtlの値を再計算し、最終的に初期化されたhashテーブルを返します.
次の図は、hashテーブルの再初期化を引き起こす可能性のあるいくつかの同時シーンを詳細に示しており、Thread 2が最終的にhashテーブルの初期化に成功したと仮定しています. Thread 1は、CASがsizeCtl変数を更新する同時シーン をシミュレートする. Thread 2はtableの二重検査の必要性 をシミュレートする.
上の図から分かるように、Thread 1でsizeCtlの値更新を同時制御しなければ、Thread 1はnew Node[]に進む可能性がある.Thread 3では、二重の判断をしなければ、Thread 3もnew Node[]に進む.
4スレッドの安全なput
put操作は以下の2種類に分けられる現在hashテーブルが現在keyに対応するindexに要素がない場合 現在のhashテーブルが現在のkeyに対応するindexに既に要素が存在する場合(hash衝突) 4.1 hash表に要素がない場合
対応するソースコードは以下の通りです.
tabAtメソッドは、Unsafe.getobjectVolatile()によって配列対応index上の要素を取得し、getobjectVolatileは対応するメモリオフセット量に作用し、volatileメモリの意味を備えている.
取得が空の場合は、casで配列の指定indexに新しいノードを作成してみます.
4.2 hash衝突時
対応するソースコードは以下の通りです.
JDK 7におけるsegmentの概念とは異なり,JDK 8ではチェーンテーブルのヘッダノードを直接ロックとする.JDK 7では、HashMapはマルチスレッド同時putの場合にリングチェーンテーブルを形成する可能性があり、ConcurrentHashMapはこのロック方式により、同じ時間に1つのスレッドだけがあるチェーンテーブルに対してputを実行させ、同時問題を解決した.
5スレッドの安全な拡張
putメソッドの最後のステップは、sizeCtlの値を超えると拡張がトリガーされるhashテーブルの要素の個数を統計することです.
拡張コードはやや長いので、中の中国語の注釈を大まかに見て、次の分析を参考にしてください.実は私たちの主な目的はConcurrentHashMapがどのようにHashMapの同時問題を解決したのかを理解することです.この問題を持ってソースコードを見ればいいです.HashMapの問題点については,本稿で最初に述べた筆者の別の記事を参考にすればよい.
実はHashMapの同時問題の多くはputと拡張の同時によるものである.
ここではConcurrentHashMapがどのように解決されているかを見てみましょう.
拡張に関するコードは次のとおりです.
上記のコードに基づいて,ConcurrentHashMapがHashMap同時問題をどのように解決するかという疑問について簡単に説明する.まずnewの新しいhashテーブル(nextTable)が出てきて、大きさは元の2倍です.後のrehashはいずれもこの新しいhashテーブルに対して操作され、元のhashテーブル(table)には関与しない. は、その後、元のhashテーブルの各チェーンテーブルに対してrehashを行い、ヘッダノードのロックを取得しようとする.このステップは、rehashの過程でこのチェーンテーブルに対してput操作を実行できないことを保証します. sizeCtl制御により、拡張中にnewが複数の新しいhashテーブルを出さないようにする. 最後に、すべてのキー値ペアを新しいテーブル(nextTable)に再rehashした後、nextTableでtableを置き換えます.これにより、HashMapでgetと拡張が同時に行われる場合、nullにgetされる可能性があるという問題が回避されます. プロセス全体で、共有変数の格納と読み取りはすべてvolatileまたはCASによって、スレッドの安全を保証します.
6まとめ
マルチスレッド環境では、共有変数の操作に注意してください.Javaメモリモデルの観点から問題を十分に考慮しなければならない.
ConcurrentHashMapではUnsafeクラスの手法が多く用いられており,我々自身もUnsafeの実例を得ることができるが,生産においてはこれを推奨しない.多くの場合、我々は、例えばAtomicパケットの下にあるCAS動作を実現するために使用することができ、lockパケットの下にあるロック関連動作を実現するために使用することができるように、パケットに提供されるツールを組み合わせることによって実現することができる.
スレッドの安全なコンテナツール、例えばConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueueなどがよく使われています.これは、ConcurrentHashMapのようにUnsafeのgetobjectVolatileとsetObjectVolatileの原子的な更新配列のメタエレメントを作業中に使用できないため、これらのコンカレントツールが重要です.
私の微信の公衆番号に注目してください.
転載先:https://juejin.im/post/5ca89afa5188257e1d4576ff
HashMapスレッドセキュリティの問題については、筆者のもう一つの文章を参考にしてください.HashMapスレッドセキュリティの問題を深く解読します.
HashMapがマルチスレッド環境で安全でないという問題に対して,HashMapの著者らは,これはバグではなく,スレッド安全なHashMapを使用すべきであると考えている.
スレッドの安全なHashMapを得るには、次の方法があります.
このうち,前の2つの方式はグローバルロックの問題で深刻な性能問題があった.そのため、有名なコンカレントプログラミングの巨匠Doug LeaはJDK 1.5のjava.util.concurrentパッケージの下にコンカレントツールをたくさん追加しました.ここにはConcurrentHashMapというスレッドの安全なHashMapが含まれています.
本稿では,ConcurrentHashMapの実現原理を簡単に紹介する.
PS:JDK 8ベース
0 ConcurrentHashMapのJDK 7でのレビュー
ConcurrentHashMapはJDK 7とJDK 8で実装方式が大きく異なる.まず、JDK 7におけるConcurrentHashMapの原理を概説してみましょう.
0.1セグメントロックテクノロジ
HashTableがhashテーブル全体をロックするという問題に対して,ConcurrentHashMapはセグメントロックの解決策を提案した.
セグメントロックの考え方は、ロック時にhashテーブル全体をロックするのではなく、一部だけをロックすることです.
どのように実現しますか?これはConcurrentHashMapで最も重要なSegmentに使用されます.
ConcurrentHashMapには1つのSegment配列が維持されており、各SegmentはHashMapと見なすことができる.
Segment自体はReentrantLockを継承しており、それ自体がロックです.
Segmentでは、HashEntry配列によって内部のhashテーブルが維持されます.
各HashEntryはmapのK-Vを表し、HashEntryでチェーンテーブル構造を構成し、nextフィールドで次の要素を参照することができます.
上記の内容はソースコードに以下のように表示されます.
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
// ... ...
/**
* The segments, each of which is a specialized hash table.
*/
final Segment[] segments;
// ... ...
/**
* Segment ConcurrentHashMap
*
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ... ...
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry[] table;
// ... ...
}
// ... ...
/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry next;
// ... ...
}
}
従って、JDK 7では、ConcurrentHashMapの全体構成を下図のように記述することができる.
上図から,我々のhash値が十分に分散している限り,putのたびにputが異なるsegmentに移行することが分かる.一方、segment自体はロックであり、putの場合、現在のsegmentは自分をロックします.この場合、他のスレッドはこのsegmentを操作できませんが、他のsegmentの操作には影響しません.これがロックセグメントがもたらすメリットです.
0.2スレッドの安全なput
ConcurrentHashMapのputメソッドのソースコードは次のとおりです.
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// key hash segment, index segment , ensureSegment
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// segment put
return s.put(key, hash, value, false);
}
最終的にsegmentのputメソッドが呼び出され、要素putがHashEntry配列に入力されます.ここでのコメントではロックに関する説明のみが表示されます.
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// segment
// tryLock
// , segment
// , scanAndLockForPut key hash node, , node , null
// scanAndLockForPut , CPU , 64 tryLock(), 64 , lock()
//
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//
unlock();
}
return oldValue;
}
0.3スレッドの安全な拡張(Rehash)
HashMapのスレッドセキュリティの問題の大部分は拡張(rehash)の過程にある.
ConcurrentHashMapの拡張は、各segmentのHashEntry配列に対してのみ拡張される.
上記putのソースコードから分かるように、ConcurrentHashMapはrehashの際にロックされているため、rehashの過程で他のスレッドがsegmentのhashテーブルを操作できないため、スレッドの安全が保証される.
1 JDK 8におけるConcurrentHashMapの初期化
無パラメータコンストラクション関数を例に、ConcurrentHashMapクラスが初期化されたときに何をするかを見てみましょう.
ConcurrentHashMap map = new ConcurrentHashMap<>();
まず、静的コードブロックと初期化クラス変数が実行されます.主に次のクラス変数を初期化します.
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
ここではUnsafeクラスが使用され、objectFieldOffsetメソッドは、sizeCtlなどの指定されたFieldのメモリ内のオフセット量を取得するために使用されます.
取得したこのオフセット量は主に何に使いますか?焦らずに、以下の分析で、出会ったときに検討すればいいです.
PS:Unsafeの紹介と使用について、筆者のもう一つの文章Unsafe類の紹介と使用を見ることができる.
2内部データ構造
まず、JDK 8でどのように定義されているストレージ構造をソースコードの観点から見てみましょう.
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*
* hash , put , 2 。
*/
transient volatile Node[] table;
/**
*
*
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node next;
}
JDK 8とJDK 7の実装は大きく異なり,JDK 8ではSegmentの概念は使用されず,HashMapの実装方式に似ていることが分かった.
PS:HashMapの原理については、筆者のもう一つの文章HashMapの原理及び内部記憶構造を参考にすることができる
この構造は下図で説明できる
3スレッドの安全なhashテーブルの初期化
ConcurrentHashMapはtableというメンバー変数を用いてhashテーブルを持つことが前述から分かる.
tableの初期化には遅延初期化ポリシーが採用されており、putを初めて実行するときにtableを初期化します.
putメソッドのソースコードは以下の通りです(一時的に関連しないコードは省略します):
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// key hash
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
// table ,
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// ...
}
// ...
}
initTableソースコードは以下の通りです.
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node[] initTable() {
Node[] tab; int sc;
// #1
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 0, else if
// #2
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// (this) SIZECTL int sc -1
// sizeCtl -1
// #3
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// ,
// #4
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 16
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
// #5
table = tab = nt; // hash , table
sc = n - (n >>> 2);
}
} finally {
// #6
sizeCtl = sc;
}
break;
}
}
return tab;
}
メンバー変数sizeCtlのConcurrentHashMapの1つの役割はHashMapのthresholdに相当し、hashテーブルの要素の個数がsizeCtlを超えると、拡張がトリガーされる.彼のもう一つの役割は、例えば、彼が-1に等しい場合、hashテーブルの初期化を実行しているスレッドがあることを示し、-1未満の値は、あるスレッドがhashテーブルに対してresizeを実行していることを示す.
この方法はまずsizeCtlが0未満であるか否かを判断し,0未満であれば現在のスレッドを直接準備状態のスレッドに変える.
sizeCtlが0以上の場合、現在のスレッドはCASでsizeCtlの値を-1に変更しようとします.修正に失敗したスレッドは次のループに入り、sizeCtl<0と判断し、yieldに住まれる.正常なスレッドを変更すると、次の初期化コードが実行されます.
new Node[]の前に、tableが空であるかどうかをもう一度チェックします.ここで二重チェックをするのは、別のスレッドが#1コードを実行した後に保留されている場合、別の初期化されたスレッドが#6のコードを実行した場合、sizeCtlが0より大きい値である場合、このスレッドを再カットして実行する場合、初期化を繰り返す可能性があるからです.この問題については下図の同時シーンで説明します.
次にhashテーブルを初期化し、sizeCtlの値を再計算し、最終的に初期化されたhashテーブルを返します.
次の図は、hashテーブルの再初期化を引き起こす可能性のあるいくつかの同時シーンを詳細に示しており、Thread 2が最終的にhashテーブルの初期化に成功したと仮定しています.
上の図から分かるように、Thread 1でsizeCtlの値更新を同時制御しなければ、Thread 1はnew Node[]に進む可能性がある.Thread 3では、二重の判断をしなければ、Thread 3もnew Node[]に進む.
4スレッドの安全なput
put操作は以下の2種類に分けられる
対応するソースコードは以下の通りです.
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
static final Node tabAt(Node[] tab, int i) {
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final boolean casTabAt(Node[] tab, int i,
Node c, Node v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
tabAtメソッドは、Unsafe.getobjectVolatile()によって配列対応index上の要素を取得し、getobjectVolatileは対応するメモリオフセット量に作用し、volatileメモリの意味を備えている.
取得が空の場合は、casで配列の指定indexに新しいノードを作成してみます.
4.2 hash衝突時
対応するソースコードは以下の通りです.
else {
V oldVal = null;
// f 4.1 tabAt
// , hash ,
synchronized (f) {
// :
// tab table,table rehash , index Node
// put , rehash , index Node f
// f,
if (tabAt(tab, i) == f) {
// put ,fh=-1
if (fh >= 0) {
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
//
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 8 , , HashMap , JDK7 ,
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
JDK 7におけるsegmentの概念とは異なり,JDK 8ではチェーンテーブルのヘッダノードを直接ロックとする.JDK 7では、HashMapはマルチスレッド同時putの場合にリングチェーンテーブルを形成する可能性があり、ConcurrentHashMapはこのロック方式により、同じ時間に1つのスレッドだけがあるチェーンテーブルに対してputを実行させ、同時問題を解決した.
5スレッドの安全な拡張
putメソッドの最後のステップは、sizeCtlの値を超えると拡張がトリガーされるhashテーブルの要素の個数を統計することです.
拡張コードはやや長いので、中の中国語の注釈を大まかに見て、次の分析を参考にしてください.実は私たちの主な目的はConcurrentHashMapがどのようにHashMapの同時問題を解決したのかを理解することです.この問題を持ってソースコードを見ればいいです.HashMapの問題点については,本稿で最初に述べた筆者の別の記事を参考にすればよい.
実はHashMapの同時問題の多くはputと拡張の同時によるものである.
ここではConcurrentHashMapがどのように解決されているかを見てみましょう.
拡張に関するコードは次のとおりです.
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
* hash
*/
transient volatile Node[] table;
/**
* The next table to use; non-null only while resizing.
* hash , table, nextTable null。
*/
private transient volatile Node[] nextTable;
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
// ----- start -----
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// ----- end -----
// ----- start -----
if (check >= 0) {
Node[] tab, nt; int n, sc;
// sizeCtl , , transfer
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// ,nextTable hash
// ,transfer hash , hash
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// sizeCtl , ,
// ,transfer new hash
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
// ----- end -----
}
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// hash , 2 , nextTable
Node[] nt = (Node[])new Node,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode fwd = new ForwardingNode(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// , nextTable null, table rehash nextTable
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// , rehash
// , , put
synchronized (f) {
if (tabAt(tab, i) == f) {
Node ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node lastRun = f;
for (Node p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
// setTabAt Unsafe.putObjectVolatile hash , volatile
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
上記のコードに基づいて,ConcurrentHashMapがHashMap同時問題をどのように解決するかという疑問について簡単に説明する.
6まとめ
マルチスレッド環境では、共有変数の操作に注意してください.Javaメモリモデルの観点から問題を十分に考慮しなければならない.
ConcurrentHashMapではUnsafeクラスの手法が多く用いられており,我々自身もUnsafeの実例を得ることができるが,生産においてはこれを推奨しない.多くの場合、我々は、例えばAtomicパケットの下にあるCAS動作を実現するために使用することができ、lockパケットの下にあるロック関連動作を実現するために使用することができるように、パケットに提供されるツールを組み合わせることによって実現することができる.
スレッドの安全なコンテナツール、例えばConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueueなどがよく使われています.これは、ConcurrentHashMapのようにUnsafeのgetobjectVolatileとsetObjectVolatileの原子的な更新配列のメタエレメントを作業中に使用できないため、これらのコンカレントツールが重要です.
私の微信の公衆番号に注目してください.
転載先:https://juejin.im/post/5ca89afa5188257e1d4576ff