同時プログラミングの実践2:AbstractQueuedSyncronizer


AbstractQueuendSyncronizerは、AQSと略称し、java.util.co ncurrentパッケージのsynchronizerの基礎フレームであり、他のsynchronizer(Lock、Semaphore、CountDownLatch、FutreTaskなどを含む)は、それをベースとして構築しています。いくつかの実装の詳細に関連していますが、ここでは原理的なものに関係していません。原理は後の具体的な実現クラスにおいて説明されます。この文章は全体から局部までの方式で述べられています。全体の枠组みから一歩一歩细かくなっていますが、すべてのコードに関连していません。これらを知ったら、自分でソースを読んで勉强することができます。Dugg Leaは論文「The java.util.concurrent Synchronizer Frame ework」でAQSの設計理念を述べています。興味があるものはここで見てもいいです。http://gee.cs.oswego.edu/dl/papers/aqs.pdf)が見えます。文章が長すぎて、この文章はCoditionObjectに関わっていません。次の文章で述べます。
はい、これから始めます。Unisafeから始めます。
Usafe
まず使っているUnsafe類の方法について説明します。
objectFieldOffset:          ,         ,                    ;
compareAndSwapInt:         ,      ,  int   ;
compareAndSwapObject:         ,      ,  Java  ;
putObject:          ;
park:      ,         :1)     unpark  ;2)     unpark  ;3)     ;4)         ;5)    (  “  ”);
unpark:          ,          ,   park         ,   unpark      ,  unpark        park  。
AQSの運用
AQSは、Synchronizerの共通の特徴に従って、一連の基本的なフレームワークを提供し、これらの共通の特徴は、アイドルロックが存在するまで、スレッドをブロックするための1つ以上のacquire動作を含む。これらの動作を疑似コードで表現できます。acqure動作は次のように表現できます。
while (synchronization state does not allow acquire) {
	enqueue current thread if not already queued;
	possibly block current thread;
}
dequeue current thread if it was queued;
releaseの動作は、次のように表されてもよい。
update synchronization state;
if (state may permit a blocked thread to acquire)
	unblock one or more queued threads;
以下では実際の例を通して、AQSはどのように適用されていますか?これはjava.util.co ncurrent.Semaphoreの簡略版です。(非公平設定のSemaphoreのコードのみを含みます。)Semaphoreの役割はいくつかのリソースへのアクセスに最大スレッド数の制限を提供しています。最大許可値permitsを設定することができます。permits個のスレッドはacquireによって同時に呼び出すことができ、超えたものはスレッドがロックを解除するまでブロックされます。すなわち、release操作を呼び出します。Semaphoreのすべての操作は一つの内部クラスを通して行われます。この内部クラスはAQSのサブクラスです。 1)AQSの同期状態(すなわちロックの数)を設定します。 2)AQSのtryAcquire Shared方法を継承し、この方法はacquire動作のスレッドのロックを取得し、残りのロックの数を返します。残りのロックが負の場合、ロックの取得に失敗し、スレッドがブロックされることを表します。 3)AQSのtryRelease Shared方法を引き継いで、この方法はrelease操作のスレッドのロックを解除し、リリースが成功したかどうかを返します。trueは成功したと表し、成功したらブロッキングスレッドを起動する操作を行います。具体的なコードを見ます。
public class MySemaphore {
	private final NonfairSync sync;

	static final class NonfairSync extends SemaphoreAbstractQueuedSynchronizer {
		private static final long serialVersionUID = -2694183684443567898L;

		NonfairSync(int permits) {
			setState(permits);
		}

		protected final int tryAcquireShared(int acquires) {
			for (;;) {
				int available = getState();
				int remaining = available - acquires;
				if (remaining < 0 || compareAndSetState(available, remaining))
					return remaining;
			}
		}

		protected final boolean tryReleaseShared(int releases) {
			for (;;) {
				int current = getState();
				int next = current + releases;
				if (next < current) // overflow
					throw new Error("Maximum permit count exceeded");
				if (compareAndSetState(current, next))
					return true;
			}
		}
	}

	public MySemaphore(int permits) {
		sync = new NonfairSync(permits);
	}

	public void acquire() throws InterruptedException {
		sync.acquireSharedInterruptibly(1);
	}

	public void release() {
		sync.releaseShared(1);
	}
}
MySemaphoreはAQSの運用を示しています。同期状態の設定と管理、tryAcquire SharedとtryRelease Sharedの実現、MySemaphoreのacquireとrelease操作はAQSを使用して実現されます。複数のスレッドがacquireとreleaseを同時に操作する場合、AQSはどのように操作の正確性を保証しますか?AQSの内部メカニズムを学ぶことによって、以下は同期状態の管理から始まります。
同期状態
AQSにおける同期状態の説明と提供の操作方法は以下の通りである。
private volatile int state;
protected final int getState();//      
protected final void setState(int newState);//      
protected final boolean compareAndSetState(int expect, int update);//        
同期状態は32ビットの整数で、ロックの数を表しています。32ビットの整数を使うのは、主に一般的にロックの数がそんなに多くないことを考慮しています。tryAcquire SharedおよびtryRelease Sharedでは、スレッドがブロックされ、起動されているかどうかを判定するために同期状態が使用される。
AQSの主体プロセス
AQSは、2つのモード、独占モードおよび共有モードを提供し、対応する方法は以下の通りである。
	acquire:         ,    。
	acquireInterruptibly:         ,        。
	release:         。

	acquireShared:         ,    。
	acquireSharedInterruptibly:         ,        。
	releaseShared:         。
独占モードを共有モードの特例として考えることができます。すなわち、独占モードはロック数を1に設定する共有モードです。さらに、AQSは、時間制限を提供するグループの方法を提供する。
	tryAcquireNanos:           ,        ,          ,    。
	tryAcquireSharedNanos:           ,        ,          ,    。
このセットは上記の方法に比べて時間の制限を増加させ、時間が短い場合(1マイクロ秒以下)にポーリングを使用し、さもなければブロッキング方式を採用する。流れについての説明は共有モードを中心にしています。他の興味があればソースコードを見ることができます。マクロから見て、スレッドはacquire SharedInterruptiblyを通じてロックを取得し、操作が完了したら、release Sharedを通じてロックを解除します。まずロックを取る操作を見に来ました。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	if (tryAcquireShared(arg) < 0) //     
		//      ,       
		doAcquireSharedInterruptibly(arg);
}
スレッド操作が完了したら、ロックを解除する操作:
public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {   //     
		//    ,           
		doReleaseShared();
		return true;
	}
	return false;
}
スレッド取得に失敗した後(acquire SharedInterruptiblyを呼び出すことにより)、待ち行列に入り、閉塞状態に入ります。他のスレッドがロックを解除すると(release Sharedを呼び出すことにより)、ブロッキングスレッドを起動する動作が行われます。起動されたスレッドは待ち行列から自分を移動し、他の操作を実行します。操作は全部待ち行列を巡って行われています。次に待ち行列を見に来ます。
待ち行列
AQSでは、FIFOキューを使用して待ちスレッドを管理していますが、一階ロックの実現にはCLHロックの変形があり、CLHロックは通常スピンロックに使用されています。例を通してCLHロックを確認して、AQSのキューアルゴリズムを理解するのに役立ちます。
public class CLHLock {
	private static class Node {
		private volatile boolean locked = false;//   
		private volatile Node next;
	}

	private AtomicReference<Node> tail = new AtomicReference<>();
	private AtomicReference<Node> head = new AtomicReference<>();

	public CLHLock() {
		Node node = new Node();
		head.set(node);
		tail.set(node);
	}
	//            ,locked   true,        
	public void lock(int key) {
		Node newNode = new Node();
		newNode.locked = true;
		Node pred = null;
		while (true) {
			pred = tail.get();
			if (tail.compareAndSet(pred, newNode)) {
				pred.next = newNode;
				break;
			}
		}
		while (pred.locked) {
		}
	}
	//                ,  locked   false
	public void unlock(int key) {
		Node h = head.get();
		Node next = h.next;
		while (next != null) {
			if (head.compareAndSet(h, next)) {
				next.locked = false;
				break;
			}
			h = head.get();
			next = h.next;
		}
	}
}
CLHLockの考えはロック・キューを使って、後のスレッドは繰り返し前のスレッドの状態を調べます。状態がロックであればスピンが待ちます。そうでなければ通過します。キューを使用することは、パフォーマンス上の利点をもたらすことができます。現代のプロセッサアーキテクチャでは、各プロセッサ自身は、プロセッサが関心のあるデータを記憶するためのcacheを持っています。プロセッサはcacheからデータを取得する効率がメモリからデータを取得するよりもはるかに高く、プロセッサ間はバスを介して通信します。毎回1つのプロセッサだけがバスを使用することができます。このようなアーキテクチャに基づいて、性能を向上させるために、私たちはできるだけローカルcacheを使用して、バスを通じてデータにアクセスすることを避けます。キューを使用するとこのようなメリットがあります。各スレッドは自分のロックを修正すると後続のスレッドにしか影響がありません。他のスレッドは影響を受けません。このように一つのスレッドだけがデータの変化によってメモリにデータを取りに行き、データ競争を減少させ、性能を向上させます。AQSのキューアルゴリズムもこのような理念に基づいているが、CLHLockよりも複雑に実現され、データ競争をできるだけ減らすために、各ノードの状態はその後続のノードだけに関連し、キューのスレッドが順次起動するのを待って、獲得ロックの競争を減少させる。これから詳しく話します。AQSは、データを保存するために非ブロッキングキューを使用しています。(ブロックレスキューをもっと知りたいなら、私の「同時プログラミングの実践」を参照してください。)
static final class Node {
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;

	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;

	volatile int waitStatus;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;
	Node nextWaiter;
	
	//      
	final boolean isShared() {
		return nextWaiter == SHARED;
	}
	
	//      
	final Node predecessor() throws NullPointerException {
		Node p = prev;
		if (p == null)
			throw new NullPointerException();
		else
			return p;
	}
	
	Node() {
	}
	
	Node(Thread thread, Node mode) { 
		this.nextWaiter = mode;
		this.thread = thread;
	}
	
	Node(Thread thread, int waitStatus) {
		this.waitStatus = waitStatus;
		this.thread = thread;
	}
}
 
next Waiterは2つの異なるモードを表しています。 1)共有モード(SHARED、複数のスレッドが通ることができる); 2)排他モード(EXCLUSIVEは、スレッドが一つしか通過できません。)私たちの説明は共有モードだけに関連しています。waitStatusはノードの状態を表しています。 1)CANCELED:ノード対応スレッドがキャンセルされました。 2)SIGNAL:ノードの次のノードのスレッドが起動されるのを待つ; 3)CONDITION:ノードに対応するスレッドがconditionの待ち行列の中で、後でconditionを言う時に関連します。 4)PROPAAGATE:このノードは処理を必要とせず、直接越えます。
次はアルゴリズムの主な流れを見ます。 1)キューに入る:スレッド取得に失敗した後、ノードを作成し、待ち行列の最後にノードを追加し、スレッドをブロックし、起動を待つ。 2)起動:他のスレッドはロックを解除し、キューの最初のノードを取って、ノード対応スレッドを起動する。 3)キューから出る:起動したスレッドはロックを取得し、成功したら自分を列から移動させ、空きのロックがあるかどうかを判断し、存在すれば次のノードを起動し続ける。毎回、最初のノードだけが起動され、複数のロックが同時に解放されると、後続のノードは、前のノードによって呼び覚まされ、データ競争を最小限に抑える。具体的なコードを見てみます。全体の流れから、スレッドはacquire SharedInterruptiblyによってロックを要求し、ロックを取得しようと試みた場合、失敗したらdoAcquire ShareShareShardInterruptibly処理に入ります。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
	//      (    ),      
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			//         ,       
			if (p == head) { 
				//          ,        
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					//          0       ,     
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			//       ,       shouldParkAfterFailedAcquire  p      SIGNAL,       false,        。   shouldParkAfterFailedAcquire    false ?     Happens-before  (volatile  ),                (          ,                 ), :
			//1、p      SIGNAL  ,doReleaseShared  head     SIGNAL, doReleaseShared            ;
			//       :  t1  shouldParkAfterFailedAcquire  p    SIGNAL,      ,tryAcquireShared    ,     t2     ,  t1 t2      ,t2       t1   。
			//2、p      SIGNAL  ,doReleaseShared     head   ( SIGNAL),  doReleaseShared       , doReleaseShared         (tryReleaseShared)   ,   shouldParkAfterFailedAcquire  p                    。
			//  shouldParkAfterFailedAcquire  p      false    ,          ,           。
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node); //       
	}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		return true;
	if (ws > 0) {
		//do something
	} else {
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //  waitStatus
	}
	return false; //         
}
列に入る操作はaddWaiterによって行われ、それはまず一度列に入ることを試み、失敗したらenqにループしてみます。成功するまで:
private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	Node pred = tail;
	if (pred != null) {
		//      ,       
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	//pred  ,       ,  enq 
	enq(node);
	return node;
}
private Node enq(final Node node) {
	for (;;) { //     ,    
		Node t = tail;
		if (t == null) { //   ,     
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}
ノードが待ち行列に入ると、ノードの前続ノードがheadではない場合、スレッドはpark AndCheckInterruptで閉塞状態になります。
private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}
スレッドブロッキングは、他のスレッドがrelease Sharedを呼び出してロックを解除した後、待ち行列の最初のノードをドレッサーSharedで起動するまで行われる。
private void doReleaseShared() {
	for (;;) {
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			if (ws == Node.SIGNAL) {
				//     SIGNAL,           
				//  h   ,        ,              ,               ,                    
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				unparkSuccessor(h); //              ,  head    
			}
			// h      PROPAGATE,         :     t1 t2  doReleaseShared ,t1    h      0,  t2  h    SIGNAL,       h      PROPAGATE, t1    unparkSuccessor,    h      0,    h    PROPAGATE      
			//         ,      h waitStatus   PROPAGATE ?                       ,       :
			//1、          , t1 t2        tryReleaseShared  ,    ,               0,               ,    ;
			//2、  t1 h      0 ,    t3,t3       ,         0,  ,  t2    ,    3   :1)t3  setHead t2  ws == Node.SIGNAL  ,  t2      head    ,    ;2)t3  setHead t2   ws == Node.SIGNAL ,  h == head  , t2 h == head     ,    ,    ;3)t3  setHead t2  h == head  ,t2     ,   h        PROPAGATE, t3   h.waitStatus < 0   ,t3          ,    。
			//                     。
			else if (ws == 0 &&
					 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		if (h == head) //  head     ,     
			break;
	}
}
private void unparkSuccessor(Node node) {
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0); //   node      0
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		//         :
		//1、s  ,  s         (          );
		//2、s     0,  s    ,         
		//       ,     tail      (head     ),                
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread); //    
}
スレッドが呼び覚まされたら、再度ロックの取得を試みます。成功したら、自分で列を移動し、列のコードをsetHeadAndPropagateに移すことにします。
private void setHeadAndPropagate(Node node, int propagate) {
	Node h = head; //   head
	setHead(node); //  head
	//           ,  :
	//1、        0,              ;
	//2、waitStatus  0,    PROPAGATE(        ,        );
	//3、h  (     h    )。
	//  node next  :
	//1、s  ,   node    ,       ,      ,      ;
	//2、s     ;
	//                   。
	if (propagate > 0 || h == null || h.waitStatus < 0) {
		Node s = node.next;
		if (s == null || s.isShared())
			doReleaseShared(); //       
	}
}
ここで起動操作とrelease時の操作が一致しています。複数のスレッドが同時にrelease操作を起動している場合、複数のロックが解除されていますが、一回だけdorelease Sharedの操作が行われる可能性があります。ここで補足しました。キューの最初のスレッドが起動され、ロックを取得したら、再度dorelease Sharedを起動し、次のスレッドを呼び起こします。鍵が全部無くなるまで、あるいは列が空きます。このようにスレッドを一つずつ起動して、順番にロックを取得して、できるだけデータ競争を減らすことができます。
ブロック機構
AQSのブロック操作はLockSupport類を使用して、最終的にUnisafeのparkとunparkを使って、下記のように実現します。
public class LockSupport {
    private LockSupport() {}

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long parkBlockerOffset;

    static {
        try {
            parkBlockerOffset = unsafe.objectFieldOffset
                (java.lang.Thread.class.getDeclaredField("parkBlocker"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	......
	//    
    public static void unpark(Thread thread) {
        if (thread != null)
            unsafe.unpark(thread);
    }
	//    
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }
    ......
}
結尾語
AQSは私たちにプログラムの複雑さを十分に示しています。複数のスレッドのインタラクションの下で、状況は非常に複雑になります。流れ全体を一つの全体として分析する必要があります。したがって、ソースコードと結合してこの文章を見てください。これは私自身のコードの分析結果です。学習の参考として、間違いがあるかもしれません。問題を発見したらメッセージをください。ありがとうございます。ありがとうございます。