Java同時実行(基礎知識):ロックと同期ツールクラスの表示
15284 ワード
ロックの表示
ロックインタフェースはJava 5.0の新しいインタフェースで、このインタフェースの定義は以下の通りです.
内蔵ロック機構とは異なり、ロックは無条件、ポーリング可能、タイミング的、および割り込み可能なロック取得操作を提供し、すべてのロックおよびロック解除方法が表示される.ReentrantLockはロックインタフェースを実現しており、内蔵ロックに比べてReentrantLockは以下の利点がある:ロック取得操作を中断することができ、ロック取得時にタイムアウト時間を設定することができる.次のコードは、ロックインタフェースの標準的な使用方法を示します.
1.1、ポーリングロックとタイミングロック
タイミング可能なポーリング可能なロック取得方式はtryLock法によって実現され,無条件のロック取得方式に比べて完全なエラー返信機構を有する.tryLockメソッドの説明は次のとおりです.
内蔵ロックでは、デッドロックは深刻な問題であり、リカバリプログラムの唯一の方法はプログラムを再起動することであり、デッドロックを防止する唯一の方法はプログラムを構築する際に不一致なロック順序を避けることであり、ポーリング可能なロックとは別の選択を提供することができる:tryLock()ですべてのロックを取得しようとし、必要なすべてのロックを取得できない場合は、取得したロックを解放し、すべてのロックを再試行します.次の例では、tryLockを使用してデッドロックを回避する方法を示します.tryLockを使用して2つのロックを取得し、同時に取得できない場合は、ロールバックして再試行します.
1.2、中断可能なロック取得操作
lockInterruptibly法は、ロックを取得しながら割り込みに対する応答を維持することができ、この方法は以下のように説明される.
1.3、読み取り-書き込みロック
Java 5は、Lockインタフェースを追加するほか、ReadWriteLockインタフェース、すなわち読み書きロックを追加し、このインタフェースは以下のように定義されている.
リード・ライト・ロックでは、複数のリード・スレッドを同時に実行できますが、ライト・スレッドとリード・スレッドの同時実行は許可されません.ライト・スレッドとライト・スレッドの同時実行も許可されません.次の例では、ReentrantReadWriteLockパッケージMapを使用して、複数のスレッド間で安全に共有できます.
同期ツールクラス
2.1、ロック
ロックは、他のスレッドで実行されている操作のセットを完了する前に、1つ以上のスレッドが待機することを可能にする同期支援クラスです.
与えられたカウントでCountDownLatchを初期化します.countDown()メソッドが呼び出されるため、現在のカウントがゼロに達するまでawaitメソッドはブロックされます.その後、待機しているすべてのスレッドが解放され、awaitの後続呼び出しはすぐに返されます.この現象は1回しか現れません.カウントはリセットできません.カウントをリセットする必要がある場合は、CyclicBarrierを使用することを考慮してください.
次の例では、TestHarnessが指定されたタスクを同時に実行するスレッドを作成し、2つのロックを使用して「開始ゲート」と「終了ゲート」を表します.各スレッドは、最初に開始ゲートで待機し、すべてのスレッドが準備されてから実行が開始されることを保証します.各スレッドが実行する最後のことは、呼び出し終了ゲートのcountDownメソッドを1に減らすことです.これにより、すべての作業スレッドが実行されるまでプライマリ・スレッドを効率的に待機させることができます.そのため、消費された時間を統計できます.
2.2、FutureTask
FutureTaskはキャンセル可能な非同期計算を表します.計算の開始とキャンセルの方法、計算が完了したかどうかをクエリーする方法、計算結果を取得する方法を使用して、Futureの基本的な実装を提供します.計算が完了した場合にのみ結果を取得できます.計算がまだ完了していない場合はgetメソッドをブロックします.計算が完了すると、計算を再開したりキャンセルしたりすることはできません.FutureTaskのメソッドの概要は次のとおりです.
FutureTaskは、計算結果を使用する前に起動できる時間の長い計算を表すために使用できます.次のコードは、高いオーバーヘッドの計算をシミュレートすることです.start()メソッドを呼び出して計算を開始し、結果が必要な場合にgetを呼び出して結果を得ることができます.
2.3、信号量
概念的には、信号量はライセンスセットを維持する.必要に応じて、ライセンスが使用可能になる前に各acquire()がブロックされ、ライセンスの取得が待機します.各release()にライセンスが追加され、ブロックされている取得者が解放される可能性があります.ただし,実際のライセンスオブジェクトを使用せずに,Semaphoreは利用可能なライセンスの番号のみをカウントし,対応する行動をとる.
Semaphoreは、通常、特定のリソース(物理的または論理的)にアクセスできるスレッドの数を制限するために使用されます.たとえば、次のクラスでは、信号量を使用してコンテンツプールへのアクセスを制御します.
1つを取得する前に、各スレッドは信号量から許可を取得し、使用可能であることを保証しなければならない.スレッドが終了すると、アイテムをプールに戻し、許可を信号量に戻し、他のスレッドがアイテムを取得できるようにします.ただし、acquire()を呼び出すと同期ロックは保持されません.これは、アイテムをプールに戻すことを阻止するためです.信号量は、プールへのアクセスを制限するために必要な同期をカプセル化し、このプール自体の一貫性を維持するために必要な同期とは別個である.
信号量を1に初期化し、使用時に使用可能なライセンスが最大1つしかないようにし、互いに反発するロックとして使用することができる.これは、通常、バイナリ信号量とも呼ばれます.これは、1つの使用可能なライセンス、またはゼロの使用可能なライセンスの2つのステータスしかないためです.このように使用する場合、バイナリ信号量は、所有者ではなくスレッドによって「ロック」を解放することができる(信号量に所有権の概念がないため).これは、デッドロックリカバリなどの特定のコンテキストで役立ちます.
Semaphoreの構築方法は、公平なパラメータをオプションで受け入れる.falseに設定すると、スレッドがライセンスを取得する順序は保証されません.特に、侵入は許可され、すなわち、待機しているスレッドの前にacquire()を呼び出すスレッドに許可を割り当てることができ、論理的には、新しいスレッドは、待機スレッドキューのヘッダに自分を置くことができる.公平にtrueに設定された場合、信号量は、任意の呼び出し取得方法のスレッドに対して、これらのメソッドを呼び出す順序(すなわち、先行先出;FIFO)に従ってスレッドを選択し、許可を得ることを保証する.なお、FIFOソートは、これらのメソッド内の指定された内部実行ポイントに必ず適用される.したがって、あるスレッドが別のスレッドよりも先にacquireを呼び出す可能性がありますが、そのスレッドの後にソートポイントに到達し、メソッドから返される場合も同様です.また、非同期tryAcquireメソッドは、公平な設定ではなく、任意の使用可能なライセンスを使用することに注意してください.
通常、すべてのスレッドがリソースにアクセスできるように、リソースアクセスを制御するための信号量を公平に初期化する必要があります.他の種類の同期制御に信号量を使用する場合、非公平なソートのスループットの利点は、通常、公平な考慮よりも重要である.
Semaphoreはまた、acquireと複数のライセンスを同時に解放する便利な方法を提供します.公平をtrueに設定していない場合、これらの方法を使用すると、不確定な延期のリスクが増加することに注意してください.
メモリ整合性効果:release()などのスレッドで「解放」メソッドを呼び出す前の操作happen-before別のスレッドでは、acquire()などの成功した「取得」メソッドの直後の操作が続きます.
2.4、柵
CyclicBarrierは、共通のバリアポイントに達するまでスレッドのセットを互いに待つことができる同期支援クラスです.一定サイズのスレッドのセットに関連するプログラムでは、これらのスレッドは常に互いに待たなければならない.この場合、CyclicBarrierは有用である.このbarrierは、待機スレッドを解放した後に再利用できるため、ループのbarrierと呼ばれる.
CyclicBarrierは、一連のスレッドの最後のスレッドが到着した後(ただし、すべてのスレッドを解放する前に)、各バリアポイントでのみ実行されるオプションのRunnableコマンドをサポートします.このバリア操作は、すべての参加スレッドを継続する前に共有状態を更新する場合に便利です.
使用例:並列分解設計でbarrierを使用する例を次に示します.
この例では、各workerスレッドはマトリクスの1行を処理し、すべての行を処理する前に、スレッドはバリアで待機します.すべてのローが処理されると、提供されたRunnableバリア操作が実行され、ローがマージされます.連結者が解決策が見つかったと判断した場合、done()はtrueを返し、すべてのworkerスレッドが終了します.
バリア動作が実行中に保留中のスレッドに依存しない場合、スレッドグループ内の任意のスレッドは、解放されたときに実行できます.この操作を容易にするために、await()を呼び出すたびに、バリアに到達できるスレッドのインデックスが返されます.次に、バリア操作を実行するスレッドを選択できます.
失敗した同期試行に対して、CyclicBarrierは、中断、失敗、タイムアウトなどの原因でスレッドがバリアポイントから早すぎると、バリアポイントで待機している他のスレッドもBrokenBarrierExceptionによって異常に離れる破壊モードを使用します.
メモリ整合性効果:スレッドでawait()を呼び出す前の操作happen-beforeバリア操作の一部である操作は、happen-beforeが別のスレッドからawait()に対応して正常に返された操作に続く.
ロックインタフェースはJava 5.0の新しいインタフェースで、このインタフェースの定義は以下の通りです.
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
内蔵ロック機構とは異なり、ロックは無条件、ポーリング可能、タイミング的、および割り込み可能なロック取得操作を提供し、すべてのロックおよびロック解除方法が表示される.ReentrantLockはロックインタフェースを実現しており、内蔵ロックに比べてReentrantLockは以下の利点がある:ロック取得操作を中断することができ、ロック取得時にタイムアウト時間を設定することができる.次のコードは、ロックインタフェースの標準的な使用方法を示します.
Lock lock = new ReentrantLock();
...
lock.lock();
try{
...
} finally {
lock.unlock();
1.1、ポーリングロックとタイミングロック
タイミング可能なポーリング可能なロック取得方式はtryLock法によって実現され,無条件のロック取得方式に比べて完全なエラー返信機構を有する.tryLockメソッドの説明は次のとおりです.
boolean tryLock(): 。 , , true。 , false。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException:
, , 。
, true。 , , , , :
;
, ;
, true。
:
;
, ,
InterruptedException, 。
, false。 time 0, 。
内蔵ロックでは、デッドロックは深刻な問題であり、リカバリプログラムの唯一の方法はプログラムを再起動することであり、デッドロックを防止する唯一の方法はプログラムを構築する際に不一致なロック順序を避けることであり、ポーリング可能なロックとは別の選択を提供することができる:tryLock()ですべてのロックを取得しようとし、必要なすべてのロックを取得できない場合は、取得したロックを解放し、すべてのロックを再試行します.次の例では、tryLockを使用してデッドロックを回避する方法を示します.tryLockを使用して2つのロックを取得し、同時に取得できない場合は、ロールバックして再試行します.
public boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
long fixedDelay = 1;
long randMod = 2;
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return false;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
1.2、中断可能なロック取得操作
lockInterruptibly法は、ロックを取得しながら割り込みに対する応答を維持することができ、この方法は以下のように説明される.
void lockInterruptibly() throws InterruptedException:
, 。
, , 。
, , , , :
;
, 。
:
;
, ,
InterruptedException, 。
1.3、読み取り-書き込みロック
Java 5は、Lockインタフェースを追加するほか、ReadWriteLockインタフェース、すなわち読み書きロックを追加し、このインタフェースは以下のように定義されている.
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
リード・ライト・ロックでは、複数のリード・スレッドを同時に実行できますが、ライト・スレッドとリード・スレッドの同時実行は許可されません.ライト・スレッドとライト・スレッドの同時実行も許可されません.次の例では、ReentrantReadWriteLockパッケージMapを使用して、複数のスレッド間で安全に共有できます.
public class ReadWriteMap <K,V> {
private final Map<K, V> map;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock r = lock.readLock();
private final Lock w = lock.writeLock();
public ReadWriteMap(Map<K, V> map) {
this.map = map;
}
public V put(K key, V value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public V remove(Object key) {
w.lock();
try {
return map.remove(key);
} finally {
w.unlock();
}
}
public void putAll(Map<? extends K, ? extends V> m) {
w.lock();
try {
map.putAll(m);
} finally {
w.unlock();
}
}
public void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
public V get(Object key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public int size() {
r.lock();
try {
return map.size();
} finally {
r.unlock();
}
}
public boolean isEmpty() {
r.lock();
try {
return map.isEmpty();
} finally {
r.unlock();
}
}
public boolean containsKey(Object key) {
r.lock();
try {
return map.containsKey(key);
} finally {
r.unlock();
}
}
public boolean containsValue(Object value) {
r.lock();
try {
return map.containsValue(value);
} finally {
r.unlock();
}
}
}
同期ツールクラス
2.1、ロック
ロックは、他のスレッドで実行されている操作のセットを完了する前に、1つ以上のスレッドが待機することを可能にする同期支援クラスです.
与えられたカウントでCountDownLatchを初期化します.countDown()メソッドが呼び出されるため、現在のカウントがゼロに達するまでawaitメソッドはブロックされます.その後、待機しているすべてのスレッドが解放され、awaitの後続呼び出しはすぐに返されます.この現象は1回しか現れません.カウントはリセットできません.カウントをリセットする必要がある場合は、CyclicBarrierを使用することを考慮してください.
次の例では、TestHarnessが指定されたタスクを同時に実行するスレッドを作成し、2つのロックを使用して「開始ゲート」と「終了ゲート」を表します.各スレッドは、最初に開始ゲートで待機し、すべてのスレッドが準備されてから実行が開始されることを保証します.各スレッドが実行する最後のことは、呼び出し終了ゲートのcountDownメソッドを1に減らすことです.これにより、すべての作業スレッドが実行されるまでプライマリ・スレッドを効率的に待機させることができます.そのため、消費された時間を統計できます.
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
2.2、FutureTask
FutureTaskはキャンセル可能な非同期計算を表します.計算の開始とキャンセルの方法、計算が完了したかどうかをクエリーする方法、計算結果を取得する方法を使用して、Futureの基本的な実装を提供します.計算が完了した場合にのみ結果を取得できます.計算がまだ完了していない場合はgetメソッドをブロックします.計算が完了すると、計算を再開したりキャンセルしたりすることはできません.FutureTaskのメソッドの概要は次のとおりです.
boolean cancel(boolean mayInterruptIfRunning)
。
protected void done()
isDone( ) , 。
V get() throws InterruptedException, ExecutionException
, , 。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
, , ( )。
boolean isCancelled()
, true。
boolean isDone()
, true。
void run()
Future , 。
protected boolean runAndReset()
, Future , , 。
protected void set(V v)
Future , 。
protected void setException(Throwable t)
Future , ExecutionException, throwable 。
FutureTaskは、計算結果を使用する前に起動できる時間の長い計算を表すために使用できます.次のコードは、高いオーバーヘッドの計算をシミュレートすることです.start()メソッドを呼び出して計算を開始し、結果が必要な場合にgetを呼び出して結果を得ることができます.
public class Preloader {
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw new RuntimeException(e);
}
}
interface ProductInfo {
}
}
class DataLoadException extends Exception {
}
2.3、信号量
概念的には、信号量はライセンスセットを維持する.必要に応じて、ライセンスが使用可能になる前に各acquire()がブロックされ、ライセンスの取得が待機します.各release()にライセンスが追加され、ブロックされている取得者が解放される可能性があります.ただし,実際のライセンスオブジェクトを使用せずに,Semaphoreは利用可能なライセンスの番号のみをカウントし,対応する行動をとる.
Semaphoreは、通常、特定のリソース(物理的または論理的)にアクセスできるスレッドの数を制限するために使用されます.たとえば、次のクラスでは、信号量を使用してコンテンツプールへのアクセスを制御します.
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
1つを取得する前に、各スレッドは信号量から許可を取得し、使用可能であることを保証しなければならない.スレッドが終了すると、アイテムをプールに戻し、許可を信号量に戻し、他のスレッドがアイテムを取得できるようにします.ただし、acquire()を呼び出すと同期ロックは保持されません.これは、アイテムをプールに戻すことを阻止するためです.信号量は、プールへのアクセスを制限するために必要な同期をカプセル化し、このプール自体の一貫性を維持するために必要な同期とは別個である.
信号量を1に初期化し、使用時に使用可能なライセンスが最大1つしかないようにし、互いに反発するロックとして使用することができる.これは、通常、バイナリ信号量とも呼ばれます.これは、1つの使用可能なライセンス、またはゼロの使用可能なライセンスの2つのステータスしかないためです.このように使用する場合、バイナリ信号量は、所有者ではなくスレッドによって「ロック」を解放することができる(信号量に所有権の概念がないため).これは、デッドロックリカバリなどの特定のコンテキストで役立ちます.
Semaphoreの構築方法は、公平なパラメータをオプションで受け入れる.falseに設定すると、スレッドがライセンスを取得する順序は保証されません.特に、侵入は許可され、すなわち、待機しているスレッドの前にacquire()を呼び出すスレッドに許可を割り当てることができ、論理的には、新しいスレッドは、待機スレッドキューのヘッダに自分を置くことができる.公平にtrueに設定された場合、信号量は、任意の呼び出し取得方法のスレッドに対して、これらのメソッドを呼び出す順序(すなわち、先行先出;FIFO)に従ってスレッドを選択し、許可を得ることを保証する.なお、FIFOソートは、これらのメソッド内の指定された内部実行ポイントに必ず適用される.したがって、あるスレッドが別のスレッドよりも先にacquireを呼び出す可能性がありますが、そのスレッドの後にソートポイントに到達し、メソッドから返される場合も同様です.また、非同期tryAcquireメソッドは、公平な設定ではなく、任意の使用可能なライセンスを使用することに注意してください.
通常、すべてのスレッドがリソースにアクセスできるように、リソースアクセスを制御するための信号量を公平に初期化する必要があります.他の種類の同期制御に信号量を使用する場合、非公平なソートのスループットの利点は、通常、公平な考慮よりも重要である.
Semaphoreはまた、acquireと複数のライセンスを同時に解放する便利な方法を提供します.公平をtrueに設定していない場合、これらの方法を使用すると、不確定な延期のリスクが増加することに注意してください.
メモリ整合性効果:release()などのスレッドで「解放」メソッドを呼び出す前の操作happen-before別のスレッドでは、acquire()などの成功した「取得」メソッドの直後の操作が続きます.
2.4、柵
CyclicBarrierは、共通のバリアポイントに達するまでスレッドのセットを互いに待つことができる同期支援クラスです.一定サイズのスレッドのセットに関連するプログラムでは、これらのスレッドは常に互いに待たなければならない.この場合、CyclicBarrierは有用である.このbarrierは、待機スレッドを解放した後に再利用できるため、ループのbarrierと呼ばれる.
CyclicBarrierは、一連のスレッドの最後のスレッドが到着した後(ただし、すべてのスレッドを解放する前に)、各バリアポイントでのみ実行されるオプションのRunnableコマンドをサポートします.このバリア操作は、すべての参加スレッドを継続する前に共有状態を更新する場合に便利です.
使用例:並列分解設計でbarrierを使用する例を次に示します.
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) {
myRow = row;
}
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
//mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
この例では、各workerスレッドはマトリクスの1行を処理し、すべての行を処理する前に、スレッドはバリアで待機します.すべてのローが処理されると、提供されたRunnableバリア操作が実行され、ローがマージされます.連結者が解決策が見つかったと判断した場合、done()はtrueを返し、すべてのworkerスレッドが終了します.
バリア動作が実行中に保留中のスレッドに依存しない場合、スレッドグループ内の任意のスレッドは、解放されたときに実行できます.この操作を容易にするために、await()を呼び出すたびに、バリアに到達できるスレッドのインデックスが返されます.次に、バリア操作を実行するスレッドを選択できます.
失敗した同期試行に対して、CyclicBarrierは、中断、失敗、タイムアウトなどの原因でスレッドがバリアポイントから早すぎると、バリアポイントで待機している他のスレッドもBrokenBarrierExceptionによって異常に離れる破壊モードを使用します.
メモリ整合性効果:スレッドでawait()を呼び出す前の操作happen-beforeバリア操作の一部である操作は、happen-beforeが別のスレッドからawait()に対応して正常に返された操作に続く.