【JavaSE】マルチスレッド(9)Conditionの待機/通知メカニズム

47208 ワード

本編では、ロックシステムにおけるConditionの実現原理と、そのawaitとsignal待ち/通知メカニズムをまとめます.Object wait notify/notifyAllはオブジェクトモニタモニタモニタと連携してスレッド間の待機/通知メカニズムであり、Condition Lock はjava最下位レベル、後者は言語レベルであり、より高い制御性と拡張性を有する待機通知メカニズムであることを以前に学んだ.
1 ConditionとObjectの比較
使用方法に違いがあるほか、機能特性にも大きな違いがあります.
1 Conditionサポートは中断に応答せず、Object方式はサポートしない.2 Conditionは複数の待ち行列(new複数のConditionオブジェクト)をサポートし、Object方式は1つしかサポートしない.3 Conditionはタイムアウト時間設定をサポートし、Objectはサポートしていません.
Conditionが提供する方法は、Objectのwaitおよびnotify/notifyAllメソッドを参照することによって、以下のとおりです.
  • 等価waitの方法(待機):
    void await() throws InterruptedException:         ,       condition signal signalAll          Lock,  await    ;               。
    long awaitNanos(long nanosTimeout):               ,     。
    boolean await(long time,TimeUnit unit)throws InterruptedException:  ,         。
    boolean awaitUntil(Date deadline)throws InterruptedException:               ,         。
    
  • 等価notifyの方法(通知):
    void signal():       condition    ,                  ,           Lock          。
    void signalAll():        condition    。
    
  • 2 Condition実現原理
  • conditionのソースコードから分かるように、lock.newcondition()によってconditionオブジェクトが作成され、この方法は実際にはnew ConditionObject になり、ConditionObjectクラスはAQSの内部クラスである.
  • ConditionObjectクラスには2つのメンバー変数があります.このクラスは、待機キューのヘッダ・ポインタを持つことによってキューを管理します.
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    
  • NodeクラスはAQS中のNodeクラスを多重化し、属性がある:Node nextWaiter;
  • condition内部では、先頭ノードではない一方向キューである待機キューが維持され、condition.await()メソッドを呼び出すすべてのスレッドが追加され、スレッド状態が待機状態に変換される.
  • 待ちキューは、lock.newcondition()メソッドを複数回呼び出して複数のconditionオブジェクトを作成することができ、すなわち、1つのlockが複数の待ちキューを持つことができる.
  • Object方式は、Objectオブジェクトモニタに1つの同期キューと1つの待機キューしか持たないが、パケット内のロックには1つの同期キューと複数の待機キューがある.

  • 境界キュー:キューが空の場合、キューの取得(削除)操作は、キューに新しい要素が追加されるまで取得(削除)スレッドをブロックします.キューがいっぱいになると、キューの挿入操作が挿入スレッドをブロックし、キューに空席が表示されるまでブロックします.Conditionを適用して境界キューを実現するには、次のようにします.
    public class BoundedQueue<T> {
    	private Object[] items;
    	//          
    	private int count;
    	private Lock lock = new ReentrantLock();
    	private Condition empty = lock.newCondition();
    	private Condition full = lock.newCondition();
    	public BoundedQueue(int size) {
    		items = new Object[size];
    	}
    	//       ,        ,           ,        
    	public void add(T t,int addIndex) throws InterruptedException {
    		lock.lock();
    		try {
    			//       ,          
    			while (count == items.length) {
    			full.await();
    		}
    		items[addIndex] = t;
    		count++;
    		empty.signal();
    		}finally {
    			lock.unlock();
    		}
    	}
    	//       ,        ,                      
    	public T remove(int removeIndex) throws InterruptedException {
    		lock.lock();
    		try {
    			//       ,          
    			while (count == 0) {
    			empty.await();
    			}
    			Object x = items[removeIndex];
    			count--;
    			full.signal();
    			return (T) x;
    		}finally {
    			lock.unlock();
    		}
    	}
    }
    

    3 await実現原理
    ソースコードは次のとおりです.
    public final void await() throws InterruptedException {
    	if (Thread.interrupted())
    		throw new InterruptedException();
    	// 1.        Node,        
    	Node node = addConditionWaiter();
    	// 2.          lock,                 
    	int savedState = fullyRelease(node);
    	int interruptMode = 0;
    	while (!isOnSyncQueue(node)) {
    		//                     
    		LockSupport.park(this);
    		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    			break;
    	}
    	//                   
    	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    		interruptMode = REINTERRUPT;
    	if (node.nextWaiter != null) // clean up if cancelled
    		unlinkCancelledWaiters();
    	//       
    	if (interruptMode != 0)
    		reportInterruptAfterWait(interruptMode);
    }
    
  • は、condition.await()メソッドを呼び出すと、signal/signnalAllによって現在のスレッドが待機キューから同期キューに移動し、lockが取得されるまでawaitメソッドから戻るか、待機中に中断されて中断されるまで待機キューに移動します.
  • 1呼び出しaddConditionWaiter()は、現在のスレッドをエンドプラグでカプセル化されたノードを待機キューに挿入する.この方法のソースコードは以下の通りである:
  • private Node addConditionWaiter() {
    	Node t = lastWaiter;
    	if (t != null && t.waitStatus != Node.CONDITION) {
    		unlinkCancelledWaiters();
    		t = lastWaiter;
    	}
    	//         Node
    	Node node = new Node(Thread.currentThread(), Node.CONDITION);
    	if (t == null)
    		firstWaiter = node;
    	else
    		//        
    		t.nextWaiter = node;
    	lastWaiter = node;
    	return node;
    }
    
  • 2現在のノードを待機キューに挿入すると、fullyRelease()の方法で現在のスレッドがlockを解放する.ソースコードは以下の通りである:
  • .
    final int fullyRelease(Node node) {
    	boolean failed = true;
    	try {
    		int savedState = getState();
    		//   AQS         release()
    		if (release(savedState)) {
    			//        
    			failed = false;
    			return savedState;
    		} else {
    			//             
    			throw new IllegalMonitorStateException();
    		}
    	} finally {
    		//                       
    		if (failed)
    			node.waitStatus = Node.CANCELLED;
    	}
    }
    
  • 上記コード呼び出しAQSのテンプレートメソッドreleaseメソッドは、AQSの同期状態を解放し、同期キュー内のヘッダノードの後続ノードが参照するスレッドを起動する.リリースが成功すると正常に戻り、リリースに失敗すると異常が放出されます.
  • 3現在のスレッドは、awaitメソッドを終了する前提で、condition.signal/signalAll()メソッドを中断または呼び出し、現在のノードを同期キューに移動させる.whileサイクルを終了するとacquireQueued(node,savedState)が呼び出され、スピンプロセスで同期状態の取得が成功するまで、すなわちcondition参照(関連付け)が取得されたlockでなければならない.

  • 4 signal/signnalAll実装原理
  • conditionを呼び出すsignalまたはsignalAllメソッドは、待ち行列の中で最も待ち時間の長いノードを同期キューに移動し、ノードがlockを取得する機会を与えることができる.
  • キューが先行するため、signalメソッドを呼び出すたびにヘッダノードを同期キューに移動することになる.
  • public final void signal() {
    	//           lock
    	if (!isHeldExclusively())
    		throw new IllegalMonitorStateException();
    	//             ,             
    	Node first = firstWaiter;
    	if (first != null)
    		doSignal(first);
    }
    
  • doSignalメソッド:
  • private void doSignal(Node first) {
    	do {
    		if ( (firstWaiter = first.nextWaiter) == null)
    			lastWaiter = null;
    		//             
    		first.nextWaiter = null;
    		// transferForSignal            
    	} while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    
  • transferForSignalメソッド:
  • final boolean transferForSignal(Node node) {
    	//           0
    	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    		return false;
    	//      enq          
    	Node p = enq(node);
    	int ws = p.waitStatus;
    	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    		LockSupport.unpark(node.thread);
    	return true;
    }
    
  • 結論:conditionを呼び出すsignalの前提は、現在のスレッドがlockを取得したことであり、この方法は待ち行列のヘッダノードを同期キューに移動させ、移動後に待ちスレッドを起動させる機会があり、すなわちawaitメソッドのLockSupport.park(this)メソッドから戻り、awaitメソッドを呼び出すスレッドを正常に終了させる機会がある.

  • 5 Conditionメカニズムによる生産者-消費者モデルの実現
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class Goods {
    	//     
    	private String name;
    	//       
    	private int count;
    	//       
    	private int maxCount;
    	public Goods(int maxCount) {
    		this.maxCount = maxCount;
    	}
    	private Lock lock = new ReentrantLock();
    	private Condition consumer = lock.newCondition();
    	private Condition producer = lock.newCondition();
    	//     
    	public void setGoods(String name) {
    		lock.lock();
    		try {
    			//                    
    			while (count==maxCount) {
    				System.out.println(Thread.currentThread().getName()+"        ,       ");
    				producer.await();
    			}
    			Thread.sleep(200);
    			//     
    			this.name = name;
    			count++;
    			System.out.println(Thread.currentThread().getName()+"  "+toString());
    			//        
    			consumer.signalAll();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			lock.unlock();
    		}
    	}
    	//     
    	public void getGoods() {
    		lock.lock();
    		try {
    			//     0        
    			while (count == 0) {
    			System.out.println(Thread.currentThread().getName()+"       ,       ");
    			consumer.await();
    			}
    			Thread.sleep(200);
    			//     
    			count--;
    			System.out.println(Thread.currentThread().getName()+"  "+toString());
    			//        
    			producer.signalAll();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			lock.unlock();
    		}
    	}
    	@Override
    	public String toString() {
    		return "Goods{" +"name='" + name + '\'' +", count=" + count +'}';
    	}
    }
    class Producer implements Runnable {
    	private Goods goods;
    	public Producer(Goods goods) {
    		this.goods = goods;
    	}
    	@Override
    	public void run() {
    		while (true) {
    			this.goods.setGoods("          ");
    		}
    	}
    }
    class Consumer implements Runnable {
    	private Goods goods;
    	public Consumer(Goods goods) {
    		this.goods = goods;
    	}
    	@Override
    	public void run() {
    		while (true) {
    			this.goods.getGoods();
    		}
    	}
    }
    public class Test {
    	public static void main(String[] args) {
    		List<Thread> list = new ArrayList<>();
    		Goods goods = new Goods(10);
    		Producer producer = new Producer(goods);
    		Consumer consumer = new Consumer(goods);
    		//   10      
    		for (int i = 0;i < 10;i++) {
    			Thread thread = new Thread(consumer,"   "+i);
    			list.add(thread);
    		}
    		//   5      
    		for (int i = 0;i < 5;i++) {
    			Thread thread = new Thread(producer,"   "+i);
    			list.add(thread);
    		}
    		for (Thread th : list) {
    			th.start();
    		}
    	}
    }