AQS CAS分析
46995 ワード
CAS
CAS(Compare And Swap)を比較し交換した.マルチスレッドパラレルでのロックによるパフォーマンス損失を解決するメカニズムです.CAS操作には、メモリ位置(V)の予想値(A)と新しい値(B)の3つの操作数が含まれます.メモリ位置が予想値と一致すると、プロセッサは自動的に新しい値をその位置値に更新します.そうしないと、プロセッサは何もしません.いずれの場合も、CAS命令の前に位置の値が返されます.CASは「位置VはAを含むべきだと思う.この値が含まれている場合はB値をこの位置に置く」と効果的に説明している.そうでなければ、その位置を変更しないで、この位置の現在の値だけを教えてください.JAVAでは、sum.misc.Unsafeクラスはnativeハードウェアレベルの原子操作を提供してこのCASを実現し、java.util.concurrentパッケージの下の多くのクラスがこのUnsafe.javaクラスを使用しているが、Unsafeの具体的な実装についてはここでは議論しない.
CASの典型的な応用
JAva.util.concurrent.atomicパッケージのクラスの多くはCAS操作を用いて実現されているが,これらの原子クラスの実現をAtomicIntegerの部分実装で大まかに説明する.説明の前に1つの知識点がvalatileキーワードで、valatileについて前に投稿したことがあるので、閲覧することができます.
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;// int
// ...
// , int
public AtomicInteger(int initialValue) {
value = initialValue;
}
// , int 0
public AtomicInteger() {
}
//
public final int get() {
return value;
}
// newValue
public final void set(int newValue) {
value = newValue;
}
// , newValue
public final int getAndSet(int newValue) {
/**
* for CAS
* CAS
* */
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}
// update, expect
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// current, current+1
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
// ,
}
一般的に競合が特に激しくない場合、synchronizedキーワードを使用するよりも、このパケットを使用した原子の動作性能が効率的であることがわかります(getAndSet()を参照すると、リソース競合が非常に激しい場合、このforサイクルは長い間ジャンプできない可能性がありますが、この場合はリソース競合を低減することを考慮する必要があります).多くのシーンでこれらの原子クラス操作を使用する可能性があります.典型的な応用はカウントであり、マルチスレッドの場合、スレッドセキュリティの問題を考慮する必要がある.通常、最初のイメージは次のようになります.
public class Counter {
private int count;
public Counter(){}
public int getCount(){
return count;
}
public void increase(){
count++;
}
}
上記のクラスはマルチスレッド環境でスレッドセキュリティの問題がありますが、この問題を解決する最も簡単な方法はロックをかけることで、以下のように調整することができます.
public class Counter {
private int count;
public Counter(){}
public synchronized int getCount(){
return count;
}
public synchronized void increase(){
count++;
}
}
これは悲観的なロックの実現に似ていて、私はこのリソースを取得する必要があります.それでは、私は彼にロックをかけて、他のスレッドはこのリソースにアクセスすることができなくて、私が操作した後にリソースのロックを解放するまで.悲観ロックの効率は楽観ロックに及ばないことを知っています.Atomicの原子類の実現は楽観ロックに似ており、synchronized関係字を使うよりも効率が高いと言われています.この方法を推奨します.
public class Counter {
private AtomicInteger count = new AtomicInteger();
public Counter(){}
public int getCount(){
return count.get();
}
public void increase(){
count.getAndIncrement();
}
}
//
public class Counter {
private volatile int count = 0;
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
public Counter(){}
public int getCount(){
Lock readLock = lock.readLock();
readLock.lock();
try{
return count;
} finally {
readLock.unlock();
}
}
public void increase(){
Lock writeLock = lock.writeLock();
writeLock.lock();
try{
count+=1;
} finally {
writeLock.unlock();
}
}
}
AQS (AbstractQueuedSynchronizer)
AQS
AQS(AbstractQueuedSynchronizer)は、JDKの下で提供されるFIFO待ちキューに基づくブロックロックおよび関連する同期器を実現するための同期フレームワークのセットである.この抽象クラスは,状態を表すために原子int値を使用できるいくつかの同期器ベースクラスとして設計されている.CountDownLatchやReentrantLockのようないくつかのソースコード実装を見たことがある場合、その内部にAbstractQueuedSynchronizerを継承した内部クラスSyncがあることがわかります.これらのクラスはAQSフレームワークに基づいて実現された同期器であり,類似の同期器はJUCの下に少なくないことがわかる.AQSの使い方
上述したように、AQSは、任意の状態を表すことができる状態情報に関する単一の整数を管理する.たとえば、Semaphoreは残りのライセンス数を表現し、ReentrantLockはスレッドが何回ロックを要求したかを表現します.FutureTaskは、タスクのステータス(開始、実行、完了、キャンセル)を表現するために使用します.
AQSを使用して同期器を実装するには、次のいくつかの方法を上書きし、getState,setState,compareAndSetStateのいくつかの方法を使用して取得状態を設定する必要があります.
以上の方法はすべて実装する必要はなく、取得したロックの種類に応じて異なる実装方法を選択することができ、独占(排他)取得ロックをサポートする同期器はtryAcquire、tryRelease、isHeldExclusivelyを実現すべきであり、共有取得をサポートする同期器はtryAcquireShared、tryReleaseShared、isHeldExclusivelyを実現すべきである.次に、CountDownLatchの例として、AQSに基づいて同期器が実装され、CountDownLatchが現在のカウントを同期状態で保持し、countDownメソッドがreleaseを呼び出してカウンタが減少することを説明する.カウンタが0の場合、すべてのスレッドの待機を解除します.awaitはacquireを呼び出し、カウンタが0の場合、acquireはすぐに戻ります.そうしないとブロックされます.通常、タスクが他のタスクが完了してから実行を続行する必要がある場合に使用されます.ソースコードは次のとおりです.
public class CountDownLatch {
/**
* AQS Sync
* AQS state count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// AQS getState()
setState(count);
}
int getCount() {
// AQS getState()
return getState();
}
//
protected int tryAcquireShared(int acquires) {
// state 0 , 0 1, -1
return (getState() == 0) ? 1 : -1;
}
//
protected boolean tryReleaseShared(int releases) {
// for Decrement count ;
// count 0 , false signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// count 0,
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// , count 0 timeout 。 count 0 , true
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// count
public void countDown() {
sync.releaseShared(1);
}
// count
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
以下はAQSに基づいて書かれた独占ロックの簡単な例です.
package com.stylefeng.guns.AQS;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* @author climb.s
* @date 2018/10/12 14:39 */
public class Mutex {
// Sync
private final Sync sync = new Sync();
// ,
private static class Sync extends AbstractQueuedSynchronizer {
//
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 0
@Override
protected boolean tryAcquire(int state) {
// CAS , , , (0: 1: )
if (compareAndSetState(0, 1)) {
//
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// , 0
@Override
protected boolean tryRelease(int state) {
if (getState() == 0) throw new IllegalMonitorStateException(" , ");
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//
public Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//
public void lock() {
sync.acquire(1);
}
//
public void unlock() {
sync.release(0);
}
}
package com.stylefeng.guns.AQS;
import java.util.Random;
/** @author climb.s
* @date 2018/10/12 14:56 */
public class MutexExample {
private static Random random = new Random(47);
private static Mutex mutex = new Mutex();
private static class Weight implements Runnable {
String name;
public Weight(String name) {
this.name = name;
}
@Override
public void run() {
mutex.lock();
System.out.println(name + " !");
System.out.println(name + " :" + (random.nextInt(10) + 3));
System.out.println(name + " !");
printQueuedThreads();
mutex.unlock();
}
private static void printQueuedThreads() {
System.out.print(" :");
for (Thread thread : mutex.getQueuedThreads()) {
System.out.print(thread.getName() + " ");
}
System.out.println();
}
}
public static void main(String[] args) {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(new Weight("Weight-" + i), "Thread-" + i);
}
for (int i = 0; i < 10; i++) {
threads[i].start();
}
}
}