ロック実現のAbstractQueuedSynchronizer——AQS
20727 ワード
一、AQSの紹介
キュー同期器AbstractQueuedSynchronizer(AQS)は、ロックまたは他の同期コンポーネントを構築するための基礎フレームワークであり、ロックを実現するための基礎である.volatile修飾int変数を使用して同期状態を表し、リソース取得スレッドのキューを完了するためにFIFOキューを維持します.
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;//
private transient volatile Node tail;//
private volatile int state; //
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
上記のAQSのコードの一部から、AQSは同期状態を表すstate変数(volatile修飾)を含むクラスであることがわかります.メンテナンスキューの2つの参照ヘッダノードheadとテールノードtail(volatile修飾);同期状態の変化がスレッドの安全であることを保証するための3つの主要な方法が提供される.
ではAQSはどのようにロックを実現しているのでしょうか.ロックを実装する必要がある場合は、まずAQSを継承し、指定したメソッドを書き換え、次にAQSサブクラスをカスタムコンポーネント(ロック)の実装に組み合わせ、AQSのテンプレートメソッドを呼び出します.これらのテンプレートメソッドは、書き換えメソッド(テンプレートメソッドモード)を呼び出し、所望の効果を達成します.注意:指定したメソッドを書き換えるには、AQSの3つの主要なメソッドを使用して同期状態にアクセスまたは変更する必要があります.
AQSで書き換える方法は以下の通りです.
protected boolean tryAcquire(int arg) {} //
protected boolean tryRelease(int arg) {} //
protected int tryAcquireShared(int arg) {} //
protected boolean tryReleaseShared(int arg) {} //
protected boolean isHeldExclusively() {} // AQS
独占ロックの例を見てみましょう.
class Mutex implements Lock, java.io.Serializable {
// , , AQS
private static class Sync extends AbstractQueuedSynchronizer {
// ——
protected boolean isHeldExclusively() {
return getState() == 1;
}
// —— 0
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// —— , 0
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Condition, condition condition
Condition newCondition() { return new ConditionObject(); }
}
// Sync
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
以上、AQSを用いて排他ロックを実現した例である.Mutexは、同じ時点で1つのスレッドのみがロックを占有することを許可するカスタムロックです.静的内部クラスがAQSから継承されることを定義し,対応する方法を再書き込みし,排他的な取得解放ロックを実現した.書き換えたtryAcquire法では,CAS法を呼び出して同期状態を変化させたが,原子操作が1つのスレッドでしかできないためであった.書き換えtryReleaseメソッドで同期状態を0に設定します.このロックを使用する場合,Mutexのメソッドを呼び出すだけで,同期に関する詳細は同期器によって行われる.カスタムコンカレントコンポーネントの敷居を大幅に低減します.
二、AQSの実現原理分析
AQSの使い方がわかったら、その実現原理を分析してみましょう.同期器は、排他式と共有式に分けられます.一般的にはそのうちの1つしか実現されません.ここでは主に独占ロックの実装を解析する.
同期キュー
AQSは内部の同期キューに依存して同期状態の管理を行い、現在のスレッドが同期状態の取得に失敗すると、現在のスレッドや待機状態などの情報をノード(node)として同期キューに加え、現在のスレッドをブロックする.同期状態が解放されると、ヘッダノードのスレッドが起動し、同期状態を取得しようとします.
ノードクラス
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // ( )
volatile Node prev; //
volatile Node next; //
volatile Thread thread; //
Node nextWaiter; //
...
}
NodeはAQSメンテナンスの静的内部クラスです.スレッド参照(失敗)、待機状態、および前後ノードを保存します.ノードは同期キューを構成する基礎であり、同期器はヘッダノードとテールノード(tail)を有し、同期状態の取得に失敗したスレッドはノードがキューに参加する末尾となる.シンクロナイザの構造は次のとおりです.
注意:ノードを構築するプロセスでは、複数のスレッドが失敗するため、スレッドのセキュリティを保証する必要があります.どうやってやったの?AQSは、CASに基づいてエンドノードを構築する方法compareAndSetTailを提供し、ノードがキューに正しく追加されることを保証する.
排他的取得ロック
ロックを取得するプロセスを見てみましょう.AQSのacquire(int args)メソッドを呼び出して同期状態を取得します.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
&&:短絡と、最初がfalseの場合、後の条件を判断しません.1つ目がtrueの場合、2つ目の条件も判断されます.&:最初がfalseである場合、後の条件も判断されます.
ステップ2:acquireQued(Node node,arg):ノードを「デッドサイクル」で同期状態を取得する.取得されないとノード内のスレッドがブロックされ、ブロックされたスレッドの起動は主に前駆ノードのデキューまたはブロックスレッドが中断されることによって実現される.
分析手順1:addWaiterメソッド
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // , ,
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node; // , CAS
}
}
enq(node); // , enq
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) //
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //CAS node
t.next = node;
return t;
}
}
}
キューが生成されていない場合、すなわちエンドノードがない場合、enqメソッドに入り、まずヘッダノードを作成し、その後、デッドループfor(;)ノードの正確な追加を保証するために、compareAndSetTail(CAS)という方法でノードがスレッドによって安全に追加されることを保証し(複数のスレッドが同期に失敗した後、スレッドが安全に追加されなければ、順序が混乱し、スレッドが失われる可能性があることを想像できる)、CASから戻った後にのみスレッドが戻ることができ、そうでなければ試行錯誤する.このenqメソッドは,同時追加ノードの要求をCASによりシリアル化する.
分析ステップ2:acquireQuedメソッドノードが同期キューに入ると、各ノード(またはスレッド)が自省して観察され、同期状態が取得されるとスピンから退出することができ、そうでなければスピンは継続する.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //
if (p == head && tryAcquire(arg)) { // ,false if
setHead(node); // ,
p.next = null; // help GC
failed = false;
return interrupted; // false, acquire , selfInterrupt();
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
このコードは主に2つのことをしました.
2つ目のことを分析します.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //ws=SIGNAL= -1
return true;
if (ws > 0) { //ws=CANCELLED= 1
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //ws=CONDITION= -2 or PROPAGATE= -3
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
この方法は以下のような仕事をしています.各ノードは、その前駆ノードの状態を判断する:1.その前駆ノードはSIGNAL状態であり、trueを返し、現在のノードがpark(ブロック)すべきであることを示し、parkAndCheckInterrupt()を実行する.この方法は,次のように,LockSupportのpark法を用いて現在のスレッドをブロックする.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2.その前駆ノードのwaitStatus>0、すなわちCANCELLLEDである場合、CANCELLLEDのノードは無効となり、現在のノードは、前駆ノードのwaitStatsがCANCELLLEDではないことが見つかるまで、双方向キューとして常に前方に探して再接続される.3.その前駆ノードがSIGNAL状態ではなく、waitStatus<=0、すなわちCONDITIONまたはPROPACATEである場合、11行目のコードが実行され、CASメカニズムにより前駆ノードの状態がSIGNAL状態に更新される.
排他ロック
AQSを呼び出すreleaseメソッドは,同期状態を解放し,後続ノードを起動させることができる.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // LockSupport
return true;
}
return false;
}
tryRelease解放に成功し,headノードを取得し,headノードのwaitStatusが0でなければunparkSuccessorメソッドを実行する.
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
この方法は以下の仕事をした:1.ヘッダノードのwaitStatus<0で、ヘッダノードのwaitStatusを0に設定します.2.ヘッダノードの次のノードsを取得し、s=nullまたはsのwaitStatus>0(キャンセルされた)の場合、後続の起動するノードとしてwaitStatus<=0のノードをキューの尾から前方に探す.3.nullに等しくないノードsが得られた場合、LockSupportのunparkメソッドを使用してブロックをキャンセルします.
まとめ:
同期状態を取得すると、AQSは同期キューを維持し、取得状態に失敗したスレッドがキューに追加され、キュー内でスピンします.キューを移動する条件は、前駆ノードがヘッダノードであり、同期状態が正常に取得されたことです.
同期状態が解放されると、ヘッダノードは後続ノードを起動します.