【JavaSE】マルチスレッド(9)Conditionの待機/通知メカニズム
47208 ワード
本編では、ロックシステムにおけるConditionの実現原理と、そのawaitとsignal待ち/通知メカニズムをまとめます.
1 ConditionとObjectの比較
使用方法に違いがあるほか、機能特性にも大きな違いがあります.
1 Conditionサポートは中断に応答せず、Object方式はサポートしない.2 Conditionは複数の待ち行列(new複数のConditionオブジェクト)をサポートし、Object方式は1つしかサポートしない.3 Conditionはタイムアウト時間設定をサポートし、Objectはサポートしていません.
Conditionが提供する方法は、Objectのwaitおよびnotify/notifyAllメソッドを参照することによって、以下のとおりです.等価waitの方法(待機): 等価notifyの方法(通知): 2 Condition実現原理 conditionのソースコードから分かるように、 ConditionObjectクラスには2つのメンバー変数があります.このクラスは、待機キューのヘッダ・ポインタを持つことによってキューを管理します. NodeクラスはAQS中のNodeクラスを多重化し、属性がある: condition内部では、先頭ノードではない一方向キューである待機キューが維持され、 待ちキューは、 Object方式は、Objectオブジェクトモニタに1つの同期キューと1つの待機キューしか持たないが、パケット内のロックには1つの同期キューと複数の待機キューがある.
境界キュー:キューが空の場合、キューの取得(削除)操作は、キューに新しい要素が追加されるまで取得(削除)スレッドをブロックします.キューがいっぱいになると、キューの挿入操作が挿入スレッドをブロックし、キューに空席が表示されるまでブロックします.Conditionを適用して境界キューを実現するには、次のようにします.
3 await実現原理
ソースコードは次のとおりです.は、 1呼び出し 2現在のノードを待機キューに挿入すると、 .上記コード呼び出しAQSのテンプレートメソッドreleaseメソッドは、AQSの同期状態を解放し、同期キュー内のヘッダノードの後続ノードが参照するスレッドを起動する.リリースが成功すると正常に戻り、リリースに失敗すると異常が放出されます. 3現在のスレッドは、awaitメソッドを終了する前提で、
4 signal/signnalAll実装原理 conditionを呼び出すsignalまたはsignalAllメソッドは、待ち行列の中で最も待ち時間の長いノードを同期キューに移動し、ノードがlockを取得する機会を与えることができる. キューが先行するため、signalメソッドを呼び出すたびにヘッダノードを同期キューに移動することになる. doSignalメソッド: transferForSignalメソッド: 結論:conditionを呼び出すsignalの前提は、現在のスレッドがlockを取得したことであり、この方法は待ち行列のヘッダノードを同期キューに移動させ、移動後に待ちスレッドを起動させる機会があり、すなわちawaitメソッドのLockSupport.park(this)メソッドから戻り、awaitメソッドを呼び出すスレッドを正常に終了させる機会がある.
5 Conditionメカニズムによる生産者-消費者モデルの実現
Object wait notify/notifyAll
はオブジェクトモニタモニタモニタと連携してスレッド間の待機/通知メカニズムであり、Condition Lock
はjava最下位レベル、後者は言語レベルであり、より高い制御性と拡張性を有する待機通知メカニズムであることを以前に学んだ.1 ConditionとObjectの比較
使用方法に違いがあるほか、機能特性にも大きな違いがあります.
1 Conditionサポートは中断に応答せず、Object方式はサポートしない.2 Conditionは複数の待ち行列(new複数のConditionオブジェクト)をサポートし、Object方式は1つしかサポートしない.3 Conditionはタイムアウト時間設定をサポートし、Objectはサポートしていません.
Conditionが提供する方法は、Objectのwaitおよびnotify/notifyAllメソッドを参照することによって、以下のとおりです.
void await() throws InterruptedException: , condition signal signalAll Lock, await ; 。
long awaitNanos(long nanosTimeout): , 。
boolean await(long time,TimeUnit unit)throws InterruptedException: , 。
boolean awaitUntil(Date deadline)throws InterruptedException: , 。
void signal(): condition , , Lock 。
void signalAll(): condition 。
lock.newcondition()
によってconditionオブジェクトが作成され、この方法は実際にはnew ConditionObject
になり、ConditionObjectクラスはAQSの内部クラスである.private transient Node firstWaiter;
private transient Node lastWaiter;
Node nextWaiter;
condition.await()
メソッドを呼び出すすべてのスレッドが追加され、スレッド状態が待機状態に変換される.lock.newcondition()
メソッドを複数回呼び出して複数のconditionオブジェクトを作成することができ、すなわち、1つのlockが複数の待ちキューを持つことができる.境界キュー:キューが空の場合、キューの取得(削除)操作は、キューに新しい要素が追加されるまで取得(削除)スレッドをブロックします.キューがいっぱいになると、キューの挿入操作が挿入スレッドをブロックし、キューに空席が表示されるまでブロックします.Conditionを適用して境界キューを実現するには、次のようにします.
public class BoundedQueue<T> {
private Object[] items;
//
private int count;
private Lock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private Condition full = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// , , ,
public void add(T t,int addIndex) throws InterruptedException {
lock.lock();
try {
// ,
while (count == items.length) {
full.await();
}
items[addIndex] = t;
count++;
empty.signal();
}finally {
lock.unlock();
}
}
// , ,
public T remove(int removeIndex) throws InterruptedException {
lock.lock();
try {
// ,
while (count == 0) {
empty.await();
}
Object x = items[removeIndex];
count--;
full.signal();
return (T) x;
}finally {
lock.unlock();
}
}
}
3 await実現原理
ソースコードは次のとおりです.
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. Node,
Node node = addConditionWaiter();
// 2. lock,
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
condition.await()
メソッドを呼び出すと、signal/signnalAllによって現在のスレッドが待機キューから同期キューに移動し、lockが取得されるまでawaitメソッドから戻るか、待機中に中断されて中断されるまで待機キューに移動します.addConditionWaiter()
は、現在のスレッドをエンドプラグでカプセル化されたノードを待機キューに挿入する.この方法のソースコードは以下の通りである:private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullyRelease()
の方法で現在のスレッドがlockを解放する.ソースコードは以下の通りである:final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// AQS release()
if (release(savedState)) {
//
failed = false;
return savedState;
} else {
//
throw new IllegalMonitorStateException();
}
} finally {
//
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
condition.signal/signalAll()
メソッドを中断または呼び出し、現在のノードを同期キューに移動させる.whileサイクルを終了するとacquireQueued(node,savedState)
が呼び出され、スピンプロセスで同期状態の取得が成功するまで、すなわちcondition参照(関連付け)が取得されたlockでなければならない.4 signal/signnalAll実装原理
public final void signal() {
// lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// ,
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//
first.nextWaiter = null;
// transferForSignal
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
5 Conditionメカニズムによる生産者-消費者モデルの実現
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Goods {
//
private String name;
//
private int count;
//
private int maxCount;
public Goods(int maxCount) {
this.maxCount = maxCount;
}
private Lock lock = new ReentrantLock();
private Condition consumer = lock.newCondition();
private Condition producer = lock.newCondition();
//
public void setGoods(String name) {
lock.lock();
try {
//
while (count==maxCount) {
System.out.println(Thread.currentThread().getName()+" , ");
producer.await();
}
Thread.sleep(200);
//
this.name = name;
count++;
System.out.println(Thread.currentThread().getName()+" "+toString());
//
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//
public void getGoods() {
lock.lock();
try {
// 0
while (count == 0) {
System.out.println(Thread.currentThread().getName()+" , ");
consumer.await();
}
Thread.sleep(200);
//
count--;
System.out.println(Thread.currentThread().getName()+" "+toString());
//
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return "Goods{" +"name='" + name + '\'' +", count=" + count +'}';
}
}
class Producer implements Runnable {
private Goods goods;
public Producer(Goods goods) {
this.goods = goods;
}
@Override
public void run() {
while (true) {
this.goods.setGoods(" ");
}
}
}
class Consumer implements Runnable {
private Goods goods;
public Consumer(Goods goods) {
this.goods = goods;
}
@Override
public void run() {
while (true) {
this.goods.getGoods();
}
}
}
public class Test {
public static void main(String[] args) {
List<Thread> list = new ArrayList<>();
Goods goods = new Goods(10);
Producer producer = new Producer(goods);
Consumer consumer = new Consumer(goods);
// 10
for (int i = 0;i < 10;i++) {
Thread thread = new Thread(consumer," "+i);
list.add(thread);
}
// 5
for (int i = 0;i < 5;i++) {
Thread thread = new Thread(producer," "+i);
list.add(thread);
}
for (Thread th : list) {
th.start();
}
}
}