Java concurrencyのCondation条件_動力ノードJava学院の整理

9180 ワード

Condationの紹介
Condationの役割はロックをより正確に制御することです。Condationのawait()メソッドはObjectのwait()方法に相当し、Condationの中のsignal()方法はObjectのnotify()方法に相当し、Coditionの中のsignalAll()はObjectのnotifyAll()方法に相当します。違っているのは、Objectのwait()、notify()、notifyAll()の方法は「同期ロック」(synchronizedキーワード)とバンドルで使用されます。コンディショニングは「相互反発ロック」/「共有ロック」と縛られて使う必要があります。
関数リスト

//                          。
void await()
//            、                      。
boolean await(long time, TimeUnit unit)
//            、                      。
long awaitNanos(long nanosTimeout)
//                      。
void awaitUninterruptibly()
//            、                      。
boolean awaitUntil(Date deadline)
//         。
void signal()
//         。
void signalAll()
Condationの例
例1はObjectのwait()、notify()によりスレッドの休止/起動機能を実証する。
例2は、Condationのawait()、signal()によって、スレッドの休止/起動機能をデモする。
例3は、Condationによる高機能です。
例1

public class WaitTest1 {
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    synchronized(ta) { //   synchronized(ta)  “  ta    ”
      try {
        System.out.println(Thread.currentThread().getName()+" start ta");
        ta.start();
        System.out.println(Thread.currentThread().getName()+" block");
        ta.wait();  //   
        System.out.println(Thread.currentThread().getName()+" continue");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      synchronized (this) { //   synchronized(this)  “        ”
        System.out.println(Thread.currentThread().getName()+" wakup others");
        notify();  //   “          ”
      }
    }
  }
}
例2

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest1 {
  private static Lock lock = new ReentrantLock();
  private static Condition condition = lock.newCondition();
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    lock.lock(); //    
    try {
      System.out.println(Thread.currentThread().getName()+" start ta");
      ta.start();
      System.out.println(Thread.currentThread().getName()+" block");
      condition.await();  //   
      System.out.println(Thread.currentThread().getName()+" continue");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();  //    
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      lock.lock();  //    
      try {
        System.out.println(Thread.currentThread().getName()+" wakup others");
        condition.signal();  //   “condition         ”
      } finally {
        lock.unlock();  //    
      }
    }
  }
}
実行結果:

main start ta
main block
ta wakup others
main continue
「例1」と「例2」を通して、CondationとObjectの方法に対応関係があることを知っています。
              Object      コンディショニング 
スリープ          wait        await
スレッドを起動     notify      signal
すべてのスレッドを起動   notifyAll   signal All
Condationは上記の機能をサポートする以外に、より強力なところは、マルチスレッドの休止と起動をより細かく制御できることである。同じロックに対しては、複数のCondationを作成し、異なる状況で異なるCondationを使用します。
例えば、マルチスレッドが同じバッファ領域を読み書きする場合、バッファエリアにデータを書き込むと、「スレッドを読む」ことが起動されます。バッファからデータを読み出すと「書き込みスレッド」が起動されます。また、バッファが満杯の場合、「書き込みスレッド」は待ち時間が必要です。バッファが空の場合、「スレッドを読む」には待ち時間が必要です。        Objectクラスのwait()を採用すれば、notifyAll()がこのバッファエリアを実現し、バッファエリアにデータを書き込むと「スレッドを読む」と起動する必要がある場合、notify()やnotifyAll()の明示的な指定で「スレッドを読む」と起動することはできず、notifyAllを通してすべてのスレッドを起動するしかありません。 ただし、Condationによって、明確にスレッドを指定することができます。
以下の例3を見ると、この概念をより深く理解することができる。 
例3

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull = lock.newCondition(); 
  final Condition notEmpty = lock.newCondition(); 
  final Object[] items = new Object[5];
  int putptr, takeptr, count;
  public void put(Object x) throws InterruptedException {
    lock.lock();  //   
    try {
      //   “    ”,   ;  “  ”    ,  x      。
      while (count == items.length)
        notFull.await();
      //  x      
      items[putptr] = x; 
      //  “put   putptr+1”;  “    ”,  putptr 0。
      if (++putptr == items.length) putptr = 0;
      //  “  ”  +1
      ++count;
      //   take  ,  take    notEmpty.await()  
      notEmpty.signal();
      //        
      System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
    } finally {
      lock.unlock();  //    
    }
  }
  public Object take() throws InterruptedException {
    lock.lock();  //   
    try {
      //   “    ”,   ;  “  ”   ,  x      。
      while (count == 0) 
        notEmpty.await();
      //  x      
      Object x = items[takeptr]; 
      //  “take   takeptr+1”;  “    ”,  takeptr 0。
      if (++takeptr == items.length) takeptr = 0;
      //  “  ”  -1
      --count;
      //   put  ,  put    notFull.await()  
      notFull.signal();
      //        
      System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
      return x;
    } finally {
      lock.unlock();  //    
    }
  } 
}
public class ConditionTest2 {
  private static BoundedBuffer bb = new BoundedBuffer();
  public static void main(String[] args) {
    //   10 “   ”, BoundedBuffer       (  0-9);
    //   10 “   ”, BoundedBuffer       。
    for (int i=0; i<10; i++) {
      new PutThread("p"+i, i).start();
      new TakeThread("t"+i).start();
    }
  }
  static class PutThread extends Thread {
    private int num;
    public PutThread(String name, int num) {
      super(name);
      this.num = num;
    }
    public void run() {
      try {
        Thread.sleep(1);  //     1ms
        bb.put(num);    //  BoundedBuffer     
      } catch (InterruptedException e) {
      }
    }
  }
  static class TakeThread extends Thread {
    public TakeThread(String name) {
      super(name);
    }
    public void run() {
      try {
        Thread.sleep(10);          //     1ms
        Integer num = (Integer)bb.take();  //  BoundedBuffer     
      } catch (InterruptedException e) {
      }
    }
  }
}
(ある時)運転結果:
p 1 put  1
p 4 put  4
p 5 put  5
p 0 put  0
p 2 put  2
t 0 take 1
p 3 put  3
t 1 take 4
p 6 put  6
t 2 take 5
p 7 put  7
t 3 take 0
p 8 put  8
t 4 take 2
p 9 put  9
t 5 take 3
t 6 take
t 7 take 7
t 8 take 8
t 9 take 9
結果説明:
(01)BoundedBufferは容量が5のバッファであり、バッファにはObjectオブジェクトが格納されており、マルチスレッドの読み書きバッファがサポートされている。複数のスレッドが「一つのBoundedBufferオブジェクト」を操作する場合、それらは相互反発ロックによってバッファエリアitemsに相互反発訪問を行う。また、同じBoundedBufferオブジェクトの下の全てのスレッドは、「notFull」と「notEmpty」の2つのCoditionを共有しています。
       notFullは書き込みバッファを制御します。notEmptyは読み取りバッファを制御します。バッファがいっぱいになったら、putを呼び出すスレッドがnotFull.awaitを実行します。バッファが満杯でない場合は、対象をバッファに追加し、バッファの容量count+1を最後にnotEmpty.signal()バッファnotEmpty上の待ちスレッド(notEmpty.awaitを呼び出すスレッド)を呼び出します。つまり、notFullは「バッファの書き込み」を制御し、バッファにデータを書き込むとnotEmpty上の待ちスレッドが起動します。
       同じように、notEmptyは「バッファ領域の読み取り」を制御し、バッファデータを読み込むとnotFull上の待ちスレッドを起動します。
(02)ConditionTest 2のmain関数で、10個の「書き込みスレッド」を起動し、BoundedBufferの中に絶えずデータを書き込む(0-9に書き込む);また、10の「スレッド読み」も起動し、BoundedBufferからデータを読み続けています。
(03)運転結果を簡単に分析します。
     1,p 1スレッドはバッファに1を書き込みます。    バッファデータ:   | 1|   |   |   |   |
     2,p 4スレッドはバッファに4を書き込みます。    バッファデータ:   | 1|4|   |   |   |
     3,p 5スレッドはバッファに5を書き込みます。    バッファデータ:   | 1 124 4 1245 124   |   |
     4,p 0スレッドはバッファに0を書き込みます。    バッファデータ:   | 1 124 4 1240 124   |
     5,p 2スレッドはバッファに2を書き込みます。    バッファデータ:   | 1 124 4 1240 124 2 124
     この場合、バッファ容量は5となります。バッファがいっぱいですこの時、「書き込みスレッド」がバッファにデータを書き込みたい場合は、putのnotFull.await()を呼び出して待ち、直接バッファが満たせない状態で運転を続けます。
     6,t 0スレッドはバッファからデータ1を取り出します。バッファデータ:   |   | 4 124 5 1240 124 2 124
     7,p 3スレッドはバッファに3を書き込みます。    バッファデータ:   | 3 124 4 1240 124 2 124
     8,t 1スレッドはバッファからデータ4を取り出します。バッファデータ:   | 3|   | 5 124 0 124 2 124
     9,p 6スレッドはバッファに6を書き込みます。    バッファデータ:   | 3 124 6 1240 124 2 124
     ...