スレッド間のコラボレーション

17117 ワード

スレッドを使用して複数のタスクを同時に実行する場合、ロック(反発)を使用して2つのタスクの動作を同期することで、1つのタスクが別のタスクのリソースに干渉しないようにできます.つまり、2つのタスクが交互に共有リソースに入る場合は、反発を使用して、この時点で1つのタスクだけがこのリソースにアクセスできるようにすることができます.スレッド間で反発するのではなく、複数のタスク間で問題を解決できるように協力したい場合があります.
タスクが協力するとき、重要な問題はこれらのタスク間の握手です.このような握手を実現するためには,スレッド間の基礎的特性である反発を利用しなければならない.この場合、反発は、1つのタスクのみが信号に応答することを保証し、それらの間の競合関係を解決することができる.
しかし、反発だけではタスク間の協力が完了しない場合は、反発に基づいて新しいルートが追加され、外部条件が変化するまで自身を停止させることができ、このタスクが実行される時間であることを示します.この握手はObjectの方法wait()、notify()、notifyAll()によって安全に実現できる.Java SE 5コンカレントライブラリにはawait()とsignal()のConditionオブジェクトも用意されています.
wait()とnotifyAll()
wait()は、外部条件の変更を待つ間にタスクを一時停止し、notify()とnotifyAll()が発生する場合にのみ、このタスクは発生した変化をチェックするために呼び出されます.したがってwait()は、タスク間でアクティビティを同期する方法を提供する.
ここで注意しなければならないのは、sleep()を呼び出すとロックが解放されず、yield()メソッドを呼び出す場合もこの場合です.これは重要です.タスクでwait()を呼び出すと、スレッドの実行が停止され、オブジェクト上のロックが解放されます.つまりwait()操作はロックが解放されます.これは、他のタスクがこのロックを取得し、そのタスクを実行することを意味します.
wait()には、パラメータとしてミリ秒数を受信する2つの方法があり、sleep()メソッドと同様に、作成された時間内に「実行を一時停止」することを意味します.しかしsleep()とは異なりwait()にとって:
[1]:wait()前進ロックを実行すると解放されます[2]:notify()とnotifyAll()、または時間が切れてwait()から回復できます
もう1つの方法はwait()がパラメータを受け入れないことです.この待ちは、notify()とnotifyAll()のメッセージが受信されるまで続く.
wait()、notify()およびnotifyAll()には、Threadの一部ではなく、ベースクラスObjectに属する特別な点があります.これらのメソッドが操作するロックはすべてのオブジェクトの一部であるため、wait()を任意の同期制御メソッドに置くことができ、このクラスがThreadクラスまたはRunnableインタフェースを継承しているかどうかを考慮する必要はありません.
notify()とnotifyAll()
notify()を使用する場合は、同じロックを待つ多くのタスクのうち1つだけが起動されることを意味します.notify()を使用する場合は、起動されるのが適切なタスクであることを保証する必要があります.一方notify()を使用するには、すべてのタスクが同じ条件を待たなければなりません.複数の異なる条件を待っているタスクがある場合は、適切なタスクを呼び覚ましたかどうか分かりません.notifyAll()は、このロックを待つすべてのタスクを呼び覚ます
ケース
ここでは3つの具体的なケースでスレッド間の相互協力を実現し,最初のケースはmainメソッドでサブメソッド実行出力を3回実現した後,メインメソッドプログラム実行出力を3回実行し,合計3回循環するというステップである.main()メソッドでは、modelオブジェクトというロックを共通に使用する2つのスレッドが実行されます.
public class MainThread {
     public static void main(String[] args) {

          ThreadModel model = new ThreadModel();
          new Thread(new Runnable() {
              @Override
              public void run() {
                   for(int i=0;i<3;i++)
                        model.subMethod(i+1);
              }
          }){}.start();

          for(int i=0;i<3;i++){
              model.mainMethod(i+1);
          }
     }
}
class ThreadModel{
     private boolean b = true;      //    
     public synchronized void subMethod(int count){
          while (!b){       // b   false         ,    ,           
              try {
                   this.wait();
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          }
         //b  true                 ,     false           
          for(int i=0;i<3;i++){
              System.out.println("      "+(i+1)+" "+"...."+count);
          }
          b = false;
          this.notify();
     }
     public synchronized void mainMethod(int count){
         while (b){         // b  true          ,   ,         
              try {
                   this.wait();
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          }
          //b  false                 ,     true           
          for(int i = 0;i <3;i++){
              System.out.println("      "+(i+1)+" "+"...."+count);
          }
          b = true;     
          this.notify();
     }
}

出力:サブスレッド実行1回目….1サブスレッド実行2回目….サブスレッド実行3回目….メインスレッド実行1回目….メインスレッド実行2回目….メインスレッド実行3回目….サブスレッド実行1回目….2サブスレッド実行2回目….サブスレッド実行3回目….メインスレッド実行1回目….2メインスレッド実行2回目….メインスレッド実行3回目….回….2サブスレッド実行1回目….3サブスレッド実行2回目….3サブスレッド実行3回目….3メインスレッド実行1回目….3メインスレッド実行2回目….3メインスレッド実行3回目….
次の例では,スレッドプールを用いて2つのプロセスを実証した:1つはワックスをCarに塗布することであり,1つはそれを研磨することである.研磨タスクは、ワックス塗布タスクが完了するまで作業を実行できません.ワックス塗布タスクは、別のワックスを塗布する前に研磨タスクの完了を待たなければなりません.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Car{
    private boolean waxOn = false;

    public synchronized void waxed(){   //  
        waxOn = true;
        notifyAll();
    }
    public synchronized void buffed() { //  
       waxOn = false;
       notifyAll();
    }
    public synchronized void waitForWaxing() throws InterruptedException {
        while (!waxOn)
            wait();
    }
    public synchronized void waitForBuffing() throws InterruptedException {
        while (waxOn)
            wait();
    }
}

class WaxOn implements Runnable{
    private Car car;
    public WaxOn(Car car){
        this.car = car;
    }
    @Override
    public void run() {
        try{
            while (true){
                System.out.print("Wax On!");        //    
                Thread.sleep(200);
                car.waxed();        //     true ,       WaxOff     
                car.waitForBuffing();   //                  
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax On task");
    }
}

class WaxOff implements Runnable{
    private Car car;
    public WaxOff(Car car){
        this.car = car;
    }
    @Override
    public void run() {
        try{
            while (true){
                car.waitForWaxing();    //  waxOn  false    ,   ,       , waxOn    
                System.out.print("Wax Off!");   //        
                Thread.sleep(200);
                car.buffed();       // waxOn   false   WaxOn     
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Ending Wax Off task");
    }
}

public class WaxOMatic {
    public static void main(String[] args) throws InterruptedException {

        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        Thread.sleep(1000);      
        exec.shutdownNow();   //1s          ,  exec.shutdownNow()         interrupt()
     }
}

出力Wax On!Wax Off!Wax On!Wax Off!Wax On!Exiting via interrupt Ending Wax Off task Exiting via interrupt Ending Wax On task
次の例は、生産者と消費者の問題であり、生産が完了した後にのみ消費され、消費が完了した後に生産タスクを実行し、毎回1つのリソースのみを生産し、消費することができる.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ReSource{
    private String name;
    private int count = 1;
    private boolean flag = false;

    public synchronized void set(String name){
        while(true){
            while(flag){   //      ,          ,                  
                try {
                    wait();
                } catch (InterruptedException e) {
                    System.out.println("    ");
                }
            }
            this.name = name + count;
            count++;
            System.out.println(Thread.currentThread().getName()+ "...   ...." + this.name);
            flag = true;
            notifyAll();             //       ,             ,             
        }
    }
    public synchronized void out(){
        while(true){
            while(!flag){
                try {
                    wait();
                } catch (InterruptedException e) {
                    System.out.println("    ");
                }
            }
            System.out.println(Thread.currentThread().getName()+"...   ...." + this.name);
            flag = false;
            notifyAll();
        }
    }
}

class Producer implements Runnable{
    ReSource rs;
    Producer(ReSource rs){
        this.rs = rs;
    }

    public void run(){
        rs.set("  ");
    }
}

class Consumer implements Runnable{
    ReSource rs;
    Consumer(ReSource rs){
        this.rs = rs;
    }

    public void run(){
        rs.out();
    }

}

public class Restaurant {
    public static void main(String args []) throws InterruptedException {

        ReSource rs = new ReSource();
        Producer producer = new Producer(rs);             //            
        Consumer consumer = new Consumer(rs);
        ExecutorService service = Executors.newCachedThreadPool();

        service.execute(new Thread(producer));
        service.execute(new Thread(producer));
        service.execute(new Thread(consumer));
        service.execute(new Thread(consumer));
        Thread.sleep(10);
        service.shutdownNow();
        System.exit(0);
    }
}

出力pool-1-thread-1...生産者....ダック1 pool-1-thread-4...消費者....ダック1 pool-1-thread-2...生産者....ダック2 pool-1-thread-4...消費者....ダック2 pool-1-thread-1...生産者....ダック3 pool-1-thread-4...消費者....ダック3 pool-1-thread-2...生産者....ダック4 pool-1-thread-4...消費者....北京ダック
上記の3つの例では、判断フラグがスレッドを一時停止させるときにwhile()ループを使用し、while()ループを使用するメリットはたくさんあります.
  • 複数のタスクが異なる理由で同じロックを待っている可能性がありますが、最初の起動タスクはこのような状況を変える可能性があります(そうしなくても、クラスを継承する人がいるかもしれません).このような場合、興味のある条件が変化するまで、このタスクは再び停止されるべきです.
  • は、このタスクがwait()から起動されたときに、ある他のタスクで変更される可能性があり、このタスクが実行できないか、または他の操作を実行することは重要ではないように見えます.wait()を再度呼び出して一時停止する必要があります.
  • また、いくつかのタスクが異なる理由でオブジェクトのロックを待っている可能性があります.この場合、正しい原因で起動したかどうかを確認し、そうでなければ再び停止する必要があります.

  • 例3では、while()ループ判定フラグを用いずに、両方の生産者スレッドが保留中である場合、一方の消費者が消費タスクを完了すると、両方の生産者が起動し、一方の生産者がリソースを取得し、生産タスクを完了すると、他方の生産者スレッドが実行権を取得しif()条件の下でタグを判断しないため、別の生産者が元の保留に基づいて実行権を得ることも生成されたタスクを実行するが、本来は保留されるべきである.このとき問題が発生し、while()ループは発生せず、スレッドが起動するとループ領域がタグを判断し、このときのタグが実行を導くか、再び停止するかを判断します.
    参考書:『Javaプログラミング思想』Bruce Eckel著陳昊鵬訳