Java(マルチ)生産者と消費者モデル

11548 ワード

生産者と消費者モデルはJavaマルチスレッドでよく見られる古典的なモデルである.このモードでのスレッドの不安全な問題の発生と解決策を特筆したブログを作成します.
一、単一消費者と単一生産者モデル
生産者消費者モデルとは,一つ/複数のスレッドが物を生産し(資源に値を付けるなど),もう一つ/複数のスレッドが資源の中の物を消費する(資源の中の内容を出力するなど)ことである.次の例では、1つのスレッドに名前と性別を割り当て、もう1つのスレッドが名前と性別を出力します.

class Resource{
	String name;
	String sex;
	boolean flag = true;
}

class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			synchronized (r) {
				if(r.flag) {
					r.name = "john";
					r.sex = "------->male";
					r.flag = false;
				} else {
					r.name = "lilly";
					r.sex = "----------->female";
					r.flag = true;
				}
			}
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			synchronized (r) {
				try {
					Thread.sleep(300);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.out.println(r.name+r.sex);
			}
		}
	}		
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(out);
		t1.start();t2.start();

	}
}

ここでの同期の使用に注意してください.同期を付けないと、名前を変更した後に出力スレッドが実行権を得て出力文を実行する可能性があります.この場合、性別は変更されていないため、john--->femaleのように性別が統一されていないスレッドの問題が発生する可能性があります.同期されたロックは、同じ(ここではリソースr)でなければなりません.各ロックは、独自のスレッドのセットを管理します.Outの同期コードブロック内のロックを別のロック(例えばnew Object())に変更したり、ロックしないと、Outがrロックによって管理されず、スレッドセキュリティの問題が発生することはありません.
Input、Outputのロックはすべてrであることがわかります.したがって、Input、OutputをResourceの同期関数に変更することができ、両方のロックも同じrオブジェクトを呼び出すことができます(this).
class Resource{
	private String name;
	private String sex;
	private boolean flag = true;
	
	public synchronized void set() {
		if(this.flag) {
			this.name = "john";
			this.sex = "------->male";
			this.flag = false;
		} else {
			this.name = "lilly";
			this.sex = "----------->female";
			this.flag = true;		
		}
	}
	
	public synchronized void show() {
		try {
			Thread.sleep(300);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(this.name+this.sex);	
	}
}

class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.set();
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.show();
		}		
	}
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(out);
		t1.start();t2.start();

	}
}

二、マルチ消費者とマルチ生産者モデル
まずはwait()、notify()、notifyAll()の使い方をご紹介します.
wait():現在のスレッドからリソースを解放して待機ブロック状態にします.
notify():現在のロックに属するスレッドプールのいずれかのスレッドをランダムに呼び出し、スケジューリングできるようにします.
notifyAll():現在のロックに属するスレッドプール内のすべてのスレッドを起動します.
いずれのオブジェクトもロックとして使用できるため、この3つのメソッドはObjectクラスで定義されます.したがって、この3つのオブジェクトは同期コードブロックで使用でき、所属するロックによって呼び出される必要があります(ロック.wait()/notify()/notifyAll()).次のコードはダックを1匹生産し、ダックを1匹消費します.
class Resource{
	private String name ;
	private int num = 1;
    boolean flag = true;
	
	public synchronized void set() {
		// t1         t1,   flag t1       , t2            
		if(!this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		this.name = "duck"+"------>"+num;
		System.out.println("  "+this.name);
		num++;
		this.flag = false;
		this.notify();
	}
	
	public synchronized void show() {
		if(this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		System.out.println("  "+this.name);	
		this.flag = true;
		this.notify();
	}
}


class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) { 
			r.set();			
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.show();
		}		
	}
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(out);
		t1.start();t2.start();
	}
}


1つのスレッドだけが生産され、1つのスレッドが消費される場合、プログラムは調和しています.しかし、マルチプロダクションマルチコンシューマモデルでは、次のようなセキュリティ上の問題が発生します.
class Resource{
	private String name ;
	private int num = 1;
    boolean flag = true;
	
	public synchronized void set() {
		if(!this.flag)
			try {
				this.wait();    //    wait               
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		this.name = "duck"+"------>"+num;
		System.out.println("  "+this.name);
		num++;
		this.flag = false;
		this.notify();
	}
	
	public synchronized void show() {
		if(this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		System.out.println("  "+this.name);	
		this.flag = true;
		this.notify();
	}
}


class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) { 
			
			r.set();			
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			r.show();
		}		
	}
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(in);
		Thread t3 = new Thread(out);
		Thread t4 = new Thread(out);
		t1.start();t2.start();
		t3.start();t4.start();

	}
}

出力:
生産duck------->1消費duck------->1消費duck------->1消費duck------->1消費duck------->1
これは、t 1、t 2、t 4 waitの場合、t 3がduck------>1を消費すると、t 3 notify()がt 4スレッド(t 4 Aがwait状態)となり、t 4がCPU実行権を取得し、duck------>1を多く消費し、スレッドが安全でないという問題が生じるためである.
この問題の原因はif判断にある.t 4が実行権を取得すると、t 3がflagをtrueに変更してもflagは再判断されない(waitから実行が継続する).だからifをwhileに変えてt 4を振り返ってマークを再判断すればいいようです.本当にそうなの?
class Resource{
	private String name ;
	private int num = 1;
    boolean flag = true;
	
	public synchronized void set() {
		while(!this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		this.name = "duck"+"------>"+num;
		System.out.println("  "+this.name);
		num++;
		this.flag = false;
		this.notify();
	}
	
	public synchronized void show() {
		while(this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		System.out.println("  "+this.name);	
		this.flag = true;
		this.notify();
	}
}


class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) { 
			
			r.set();			
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.show();
		}		
	}
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(in);
		Thread t3 = new Thread(out);
		Thread t4 = new Thread(out);
		t1.start();t2.start();
		t3.start();t4.start();

	}
}

このプログラムを何度も実行すると、デッドロックが発生します.これは、notify()がスレッドを1つだけ呼び覚ますためです.t 2,t 3,t 4がwait状態であり、t 1が運転状態である場合、t 1はt 2を起動しflagを変更した後に再び実行権を獲得し、t 1はwait状態である.このときt 2は実行権を獲得し、後でflagを判断し、wait状態に入る.このとき4つのスレッドはすべてwaitで、デッドロックが発生しました.
では、どうやって解決しますか?すべてのスレッドをnotifyAll()ですべて呼び覚ますだけで簡単です.これにより、味方のスレッドがすべてwait状態になっても、相手の2つのスレッドが実行資格を有し(そのうちの1つが実行権を有する)、デッドロックは発生しません.
class Resource{
	private String name ;
	private int num = 1;
    boolean flag = true;
	
	public synchronized void set() {
		while(!this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		this.name = "duck"+"------>"+num;
		System.out.println("  "+this.name);
		num++;
		this.flag = false;
		this.notifyAll();
	}
	
	public synchronized void show() {
		while(this.flag)
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		System.out.println("  "+this.name);	
		this.flag = true;
		this.notifyAll();
	}
}


class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) { 
			r.set();			
		}
	}
}

class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.show();
		}		
	}
}

public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(in);
		Thread t3 = new Thread(out);
		Thread t4 = new Thread(out);
		t1.start();t2.start();
		t3.start();t4.start();

	}
}

三、ReentrantLockを用いた最適化解決
notifyAll()を使用すると、味方スレッドも呼び出されますが、必要ありません.これは効率の低下を招く.ReentrantLockとConditionを組み合わせてこの問題を解決することができ、起動をより正確にすることができます.
ロックインタフェース:
synchronizedのロックは暗黙的に動作し、ロックインタフェースを使用するとロックを明示的に動作に変換できます.同僚の1つのロックは、複数のConditionモニタオブジェクトを掛けることもできます.
lock():ロックを取得します.
unlock():ロックを解除します.
ReentrantLock:
ロックに属する実装クラスは,反発ロックであり,1つのスレッドが改変ロックを取得すると,他のスレッドは取得できない.
Conditionインタフェース:
synchronizedでは1組のモニタメソッド(wait,notify,notifyAll)しか使用できないため、1つのロックが複数のモニタオブジェクトをマウントして複数のモニタメソッドを使用できるようにConditionオブジェクトが生成される.Conditionは以上の3つの方法をカプセル化しています.
wait()-------->await()
notify()-------->signal()
notifyAll()-------->signalAll()
import java.util.concurrent.locks.*;;

class Resource{
	private String name ;
	private int num = 1;
    boolean flag = true;
	Lock lock = new ReentrantLock();
    Condition produer_con = lock.newCondition();
    Condition Consumer_con = lock.newCondition();
    
	public void set() {
		lock.lock();
		while(!this.flag)
			try {
				produer_con.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		this.name = "duck"+"------>"+num;
		System.out.println("  "+this.name);
		num++;
		this.flag = false;
		Consumer_con.signal();
		lock.unlock();
	}
	
	public void show() {
		lock.lock();
		while(this.flag)
			try {
				Consumer_con.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		System.out.println("  "+this.name);	
		this.flag = true;
		produer_con.signal();
		lock.unlock();
	}
}
 
 
class Input implements Runnable{
	Resource r;
	Input(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) { 
			r.set();			
		}
	}
}
 
class Output implements Runnable{
	Resource r;
	Output(Resource r){
		this.r = r;
	}		
	public void run() {
		while(true) {
			r.show();
		}		
	}
}
 
public class Demo {
	
	public static void main(String[] args) {
		Resource r = new Resource();
		Input in = new Input(r);
		Output out = new Output(r);
		Thread t1 = new Thread(in);
		Thread t2 = new Thread(in);
		Thread t3 = new Thread(out);
		Thread t4 = new Thread(out);
		t1.start();t2.start();
		t3.start();t4.start();
 
	}
}