AQSソース分析(RentrantLockを例にして)
10401 ワード
前言
Java同時プログラミングにおいては、Rentrantrant Lock、Semaphore、CountDwonLatchなどの同期ツールを使用してスレッドの安全を保証することがありますが、スレッドの安全をどのように保証するかはよく分かりません。本論文では、彼らの共通の依存関係
ケース
まず、Reentrant Lockの使用例を振り返ってみます。
ソース分析
初期化
上記のコードはまずReentrant Lock構造関数によってロックオブジェクトを作成しました。実際にどのようなオブジェクトを作成したかを見てみます。
公正ロックの一部のソースコード:
NonfairSyncのロジックにはFairSyncが含まれていますので、NonfairSyncから分析していきます。その前にNonfairSyncの継承図を見てみます。https://s1.ax1x.com/2020/03/26/Gpl0sO.png)
コアクラスのSyncがAQSを継承し、NonfairSyncとFairSyncがSyncを継承していることが見られます。この継承機構により、Reentrant Lockの加錠と解除ロックの論理が実現されました。ここからAQSには具体的な論理が実現されていません。管理スレッド待ちと取得ロックの仕組みが規定されています。また、RentrantLock(他の実装クラスのSemaphoreなど)は、いつロックを取得するかを具体的に実現しています。ロックの論理はいつ解放されますか?
AQSソース分析
AQSを説明する前に、まず一枚の図でその動作原理を説明します。
[外部チェーン写真の転送に失敗しました。ソースステーションは盗難防止チェーン機構があるかもしれません。画像を保存して直接アップロードすることを提案します。(img-xQLxYmBI-585575769929)(https://s1.ax1x.com/2020/03/30/GuSKbD.png)
スレッドがロックされていない場合、AQSは内部のNodeクラスを通じて、Threadをノードにカプセル化し、双方向チェーンの形式でロック待ちの過程でノードをキューに追加します。
次に、NonfairSyncの完全な動作メカニズムを初めから探ってみましょう。
compreAndSet State
例で
AQSクラスにあります
set Exclusive Owner Thread
acquire
上記の条件が成立しないなら、
tryAcquire
この方法はAQSでは具体的に実現されていません。これもAQSを実現するための独自の方法が必要です。
まず、現在のスレッドオブジェクトを取得し、
acquireQueued(addWaiter(Node.EXCLUSIVE)、arg)
鍵の直接取得が成功していない場合は、スレッドをキューに追加して待つ必要があり、どのようにキューに入れるかについては
acquire Queued
入隊操作が完了したら、私達はNodeがキューの中でロックをどうやって取得しているかをさらに検討したいです。次に
trueの場合は
最後にacquire方法に戻ります。スレッドがブロックされています。
締め括りをつける
最後にまとめてみます。Reentrant Lockの実現はAQSに依存しています。Reentrant Lockレベルはロックを取得する論理上のコードを書き換えただけです。そしてAQSは、双方向のキューによってThread情報を含むノードを格納し、
参照[1]JDK 8ソースコード [2]Reentrant Lockの実現からAQSの原理及び応用を見る。 [3]LockSupport原理解析 転載は出典を明記してください。
Java同時プログラミングにおいては、Rentrantrant Lock、Semaphore、CountDwonLatchなどの同期ツールを使用してスレッドの安全を保証することがありますが、スレッドの安全をどのように保証するかはよく分かりません。本論文では、彼らの共通の依存関係
AbstractQueuedSynchronizer
を詳細に解析することによって、それらがどのように資源に対する制御を実現しているかを探究する。分析を容易にするために、この文章は主にReentrant Lockから切り口として、本文を読んでから自分で他の実現類の実現ロジックを分析することができます。ケース
まず、Reentrant Lockの使用例を振り返ってみます。
public class Bootstrap {
private static final Lock lock = new ReentrantLock();
private int count = 0;
public void count() {
try {
lock.lock();
count++;
} finally {
lock.unlock();
}
}
}
このデモは簡単で、countをカウントし、RentrantLockによってマルチスレッド環境下のデータセキュリティを保証します。ソース分析
初期化
上記のコードはまずReentrant Lock構造関数によってロックオブジェクトを作成しました。実際にどのようなオブジェクトを作成したかを見てみます。
public ReentrantLock() {
sync = new NonfairSync();
}
字面からは不公平ロックの対象が作成されていることがわかりますが、RentrantLockのデフォルトサポートが非公平ロックであることは知っていますが、同じように公平ロックもサポートされていますので、new ReentrantLock(true)
を通じて公正ロックの対象を作成することができます。具体的なソースは以下の通りです。 public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ここに来てロックを作る二つの方法を知っています。次に具体的なコードを通してこの二つの錠の違いを見てみます。公正ロックの一部のソースコード:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
...
}
非公正ロックの実現部分のソースコード: static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
アンフェアロックが多く見られます。判断条件を教えます。現在のスレッドにロックを取得しようと試みています。成功すればそのまま独占ロックモードに設定します。これはちょうど不公平の原則を体現していて、直接に鍵を取ってみて、成功しないで更に他のスレッドと同じに待ちます。NonfairSyncのロジックにはFairSyncが含まれていますので、NonfairSyncから分析していきます。その前にNonfairSyncの継承図を見てみます。https://s1.ax1x.com/2020/03/26/Gpl0sO.png)
コアクラスのSyncがAQSを継承し、NonfairSyncとFairSyncがSyncを継承していることが見られます。この継承機構により、Reentrant Lockの加錠と解除ロックの論理が実現されました。ここからAQSには具体的な論理が実現されていません。管理スレッド待ちと取得ロックの仕組みが規定されています。また、RentrantLock(他の実装クラスのSemaphoreなど)は、いつロックを取得するかを具体的に実現しています。ロックの論理はいつ解放されますか?
AQSソース分析
AQSを説明する前に、まず一枚の図でその動作原理を説明します。
[外部チェーン写真の転送に失敗しました。ソースステーションは盗難防止チェーン機構があるかもしれません。画像を保存して直接アップロードすることを提案します。(img-xQLxYmBI-585575769929)(https://s1.ax1x.com/2020/03/30/GuSKbD.png)
スレッドがロックされていない場合、AQSは内部のNodeクラスを通じて、Threadをノードにカプセル化し、双方向チェーンの形式でロック待ちの過程でノードをキューに追加します。
次に、NonfairSyncの完全な動作メカニズムを初めから探ってみましょう。
compreAndSet State
例で
lock.lock()
方法を呼び出すと、NonfairSync
のロック()方法に入ります。第一歩はcompareAndSetState(0, 1)
によってロックを取得します。具体的なコードを確認します。AQSクラスにあります
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Unisafeタイプの直接メモリを操作して、CAS方式でstate値を更新します。ここでstateのこの属性を引き出しています。AQSタイプのvolatileタイプの整数値です。この値は0ならロックが占有されていないと表しています。したがって、非公正ロックはまずこの方法を呼び出して、ロックを取ってみてください。成功すればsetExclusiveOwnerThread(Thread.currentThread());
方法に入ります。この方法のコードを見てみます。set Exclusive Owner Thread
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
この方法はAbstractOwnableSynchronizer
クラスに存在し、上記の継承図によりAQSの親クラスであることがわかる。この方法はexclusive Ownerantreadを私達が制定したスレッドに設定し、スレッドがロックされていることを示しています。ここで最初の条件が終了しました。ロックを直接取得できれば、現在のスレッドをロックして、流れが終了するという意味です。acquire
上記の条件が成立しないなら、
acquire(1)
に入ります。まずこの方法のコードを見てみます。 public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
私たちは主に二つの条件に関連していることが分かります。次にそれらを一つずつ説明します。tryAcquire
この方法はAQSでは具体的に実現されていません。これもAQSを実現するための独自の方法が必要です。
NonfairSync
に対応する実装は以下の通りである。 final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
#1 int c = getState();
#2 if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
#3 else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ここではメインステップを番号付けして説明します。まず、現在のスレッドオブジェクトを取得し、
getState()
方法で現在のロックの状態を取得する。状態が0の場合、ロックがまだ占有されていないことを示して、直接にロックを取得しようと試みます。(このステップは最初の条件です。)取得に成功して現在のスレッドを独占ロックモードに設定しました。現在スレッドが独占ロックモードになっている場合、再度ロックを取得しなくても、直接stateを積算すればいいという意味ですが、stateオーバーフローに注意して、最後に新しいstateを設置すればいいです。上の条件が満たされないならfalseに戻ります。acquireQueued(addWaiter(Node.EXCLUSIVE)、arg)
鍵の直接取得が成功していない場合は、スレッドをキューに追加して待つ必要があり、どのようにキューに入れるかについては
addWaiter(Node.EXCLUSIVE)
方法があります。次に、具体的な実装コードを見てみます。 private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
まず、Nodeの初期化方法によってThreadをカプセル化し、ノードのタイプを設定することができ、ここでEXCLUSIVE
である。全体のチェーンにはheadノードとtailノードの概念があり、それぞれチェーンの頭と尾を指す。新しく加入されたノードは、まずテールノードによって迅速に設定され、テールは空ではないので、nodeのprevを直接ノードに向け、新しいテールノードをnodeノードに向けている。もし失敗したらenq(node)
方法を実行し、再入隊操作を行います。enq方法の具体的な実施は以下の通りです。 private Node enq(final Node node) {
for (;;) {
Node t = tail;
#1 if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
これはスピンの操作であると見られ、まずテールノードが空かどうかを判断します。もし空の代表チェーンテーブルがまだノードでないなら、空ノードを初期化してテールとヘッドノードを指します。もしtailが存在したら、やはり上のようにノードをチェーンの最後に設置します。ここに来てみると、nodeの入隊動作は古いテールノードに依存しており、新しいノードのフロントノードを介して新しいテールノードを設定していることがわかる。acquire Queued
入隊操作が完了したら、私達はNodeがキューの中でロックをどうやって取得しているかをさらに検討したいです。次に
acquireQueued
方法の具体的な実現を見てみます。 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
#1 if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
#2 if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
まず、2つの属性を知る必要があります。failed
はロックの取得に失敗したかどうか、interupted
はスレッドが中断されたかどうかを表しています。また、この方法はスピンの操作であり、現在のノードの前駆ノードが先頭ノードであるかどうかを判断するステップと、もしそうであれば再度ロックの取得を試み、成功すれば新しいヘッドノードを現在のノードに設定して、interruptedに戻るステップとがあります。1)現在のノードの前駆ノードがヘッドノードではないか、またはロックの取得に失敗した場合、現在のスレッドをブロックする必要があるかどうかを判断する必要があります。ここでは、2つの判断条件shouldParkAfterFailedAcquire
およびparkAndCheckInterrupt()
に関連しています。以下で順次説明します。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
#1 if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
#2 if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
#3 } else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
この方法は主に現在のノードについて傍受ノードの状態を判断し、waitStatusがSIGNAL
であれば、直接trueに戻る。waitStatusが0より大きい場合、前駆ノードがキャンセル状態であることを表し、直接に取得する前のノードは、前駆ノードのwaite Stutsが0より大きいということを知っている。waitStatusが0または3である場合、直接に、前駆ノードのwaitStatusをSIGNAL
に設定する。このようにして、現在のノードのスレッドがブロックされる必要があるかどうかを決定する。trueの場合は
parkAndCheckInterrupt
を呼び出して渋滞を実現する必要があります。具体的な実現を見てみます。 private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport類のparkメソッドを使用してスレッドの詰まりを行うことが見られます。この操作はUnisafe方法にも依存しています。具体的なスレッドに対応するpermitライセンスを直接操作して、ブロックする必要があるかどうかを決定します。最後にスレッドのブロックマークを返しました。最後にacquire方法に戻ります。スレッドがブロックされています。
selfInterrupt
方法を実行する必要があります。これは中断機構に対する応答動作です。lockInterruptibly
の方法であれば、ブロック動作に違いがあります。上にはインタールドポイントをtrueに設定していますが、ロックInterruptibly方法は直接throw new InterruptedException();
で操作が中断され、finallyではロックの取得がキャンセルされます。締め括りをつける
最後にまとめてみます。Reentrant Lockの実現はAQSに依存しています。Reentrant Lockレベルはロックを取得する論理上のコードを書き換えただけです。そしてAQSは、双方向のキューによってThread情報を含むノードを格納し、
state
変数によってロックの取得状態を表している。スレッドは、CAS方式でロック状態を設定し、ノードの待ち情報によって、ロックを取得するか、それとも、スレッドをブロックするかを決定します。この文章を通して皆さんもAQSの実現原理に対して直観的な理解ができました。参照