Javaマルチスレッド学習の生産者消費者モデル:より完璧な実現

11526 ワード

生産者、消費者モデルはマルチスレッドを学習する際の良い練習モデルである.この問題の専門的な言い方は:有限緩衝問題であるべきである.この問題は、固定サイズバッファを共有する2つのスレッド、すなわち、いわゆる「生産者」および「消費者」が実際の実行時に発生する問題を記述する.
生産者の役割は、バッファに一定量のデータを生成し、このプロセスを繰り返すことです.消費者の役割は、これらのデータを消費することです.問題の鍵は、生産者がバッファがいっぱいになったときにデータを追加しないこと、消費者がバッファが空いたときにデータを消費しないことを保証することです.
問題を解決する方法はスレッドプログラミングを採用してもよいし、スレッドプログラミングを採用しなくてもよい.採用しないと簡単に満か空の状態の判断をし、条件を満たさない場合はデータを放棄します.もちろんこのような実現の意味は大きくない.スレッドプログラミングを採用する場合は、バッファがいっぱいになると生産者はスリープし、消費者がバッファ内のデータを消費するまで生産者を呼び覚ます.同様に、バッファが空の場合に消費者を休眠させ、生産者がバッファにデータを追加した後に起動する.
-------------------------------------------------------------------------------------------------------------------------------
以下の実装案はjavaスレッドの知識を自分で学習した後に書いた初級ソリューションであり,wait(),notify()などの基礎スレッド通信方法を用いた.
この案は単一の生産者と単一の消費者を実現することができ、複数の生産者と複数の消費者にも適用することができるため、実際には比較的完璧な方法と呼ばれている.
それだけでなく、この案では各生産者の生産任務と、各消費者の消費任務を制定することができるため、両者の総和が等しいと、非常に解決しやすい.しかし、両者の総和が等しくなければ、例えば生産者の総任務が20個、消費者の総任務が30個あれば、生産者がすべて生産した後、消費者は無期限に死ぬだろう.その後、死待ちを解決するためにいくつかの検出メカニズムを追加しました.一方のタスクが完了し、もう一方がwait()に入ると、相手のタスクがすべて完了したことを検出しました.wait()ではなく、skip waitで、直接タスクを終了します.
この実装の欠点は,設計モード上の問題であり,きちんと下書き設計をしていないため,OOの基本原則に反する方法がある可能性があり,後で改善を待つ.同時にウィキペディアでは信号灯アルゴリズム,パイプアルゴリズムなどの成熟したアルゴリズム,およびマルチスレッドにおけるcondition,再入ロックなどの特性は深く研究されておらず,これらは後で実現できる.
--------------------------------------------------------------------------------------------------------------------------------
内容が多すぎるので、実現の考え方を簡単に述べるしかありません.
比較的良いシミュレーションの実際のために、Consumer消費者類、Producer生産者類、Product製品類、Warehouse倉庫類を書き、すでに単C単P、単C多P、多C単P、多C多Pに対してそれぞれ1つのテスト類を書き、結果は運行が良好である.
いくつかのクラスは以下のように簡単に説明されています.具体的にはコードを見てみましょう.内容が多すぎます.
Consumer:消費者をシミュレートし、複数のインスタンスを持つことができ、各名前を持つことができ、各消費タスク量を指定することができ、その後、クラスの静的変数が総タスク数を示すことができます.
public class Consumer implements Runnable{
	private static int totalTaskNumber=0;
	private int myTaskNumber=0;
	private static int totalTaskRemain=0;
	private int myTaskRemain=0;
	private String name;
	private Warehouse wh;
	private static int totalConsumed=0; //The total number of product produced yet.
	private int thisOneConsumed=0; //The number of product produced by this producer.
	private boolean needContinue=true;
	
	public Consumer(String name, Warehouse wh, int taskNumber){
		this.name=name;
		this.wh=wh;
		myTaskNumber=taskNumber;
		myTaskRemain=taskNumber;
		totalTaskNumber+=taskNumber;
		totalTaskRemain+=taskNumber;
	}
	
	public void consume(){
//		System.out.println("Im consumer here!");
		Product popedProduct=wh.pop(this);
		if(popedProduct==null) needContinue=false;
	}
	
	public void run(){
		for(int i=0;i

Producer:生産者をシミュレートし、消費者と差が少ない.
public class Producer implements Runnable{
	private static int totalTaskNumber=0;
	private int myTaskNumber=0;
	private static int totalTaskRemain=0;
	private int myTaskRemain=0;
	private String name;
	private Warehouse wh;
	private static int totalProduced=0; //The total number of product produced yet.
	private int thisOneProduced=0; //The number of product produced by this producer.
	private boolean needContinue=true;
	
	public Producer(String name, Warehouse wh, int taskNumber){
		this.name=name;
		this.wh=wh;
		myTaskNumber=taskNumber;
		myTaskRemain=taskNumber;
		totalTaskNumber+=myTaskNumber;
		totalTaskRemain+=myTaskNumber;
	}
	
	public void produce(){
		Product toProduce=new Product(++Product.totalID);
		needContinue=wh.push(toProduce,this);
	}
	
	public void run(){
		for(int i=0;i

Product:生産された製品をシミュレートします.現在、このクラスは比較的簡単で、属性ID番号が1つしかありません.つまり、1つの製品を生産するたびに、順番に番号をつけます.このクラスを書いて生産された製品をシミュレートし,簡単にデータを押し込むのではなく,後でこのクラスを容易に拡張できるため,潜在的な実用性が大きい.
public class Product {
	public static int totalID=0;
	private int ID;
	
	public Product(int ID){
		this.ID=ID;
	}
	
	public int getID(){
		return ID;
	}
}

Warehouse:シミュレーションウェアハウス、いわゆるマルチスレッド共有データバッファは、ほとんどの設計の心血がこのクラスにあります.構造はスタックで実現され,最も重要な動作はpush,pop法であり,前者は生産者のインスタンスのみで呼び出され,後者は消費者のインスタンスのみで呼び出される.他にもisEmpty、isFullなどの補助方法がありますが、詳細は私のブログのスタック実装に関するログを参照してください.
Warehouseの細部についてお話ししますが、実現の鍵です.倉庫がいっぱいになると、isFullメソッドはtrueに戻り、このときpushメソッドの生産者を呼び出したいと思っています.彼らはpushの製品が構築されていると思っていますが、倉庫に押し込むステップはwait()に変わり、消費者がnotifyAll()に来るまで変わります.同じように、倉庫がいっぱいになったとき、消費者がpop()を考えたら、生産作業を終えたばかりの生産者に呼び覚まされるまで待つことになります.さらに、生産タスクが完了するたびにnotifyAllが必要で、待機している消費者に消費できることを通知し、消費タスクが完了するたびにnotifyAllで生産者倉庫が爆発しないことを通知し、製品を押し込むことができます.
次に、タスクが同期していない問題について話します.各生産者または消費者は、インスタンスを作成するときに生産または消費タスクを割り当てることができ、生産者の総タスクと消費者の総タスクとが等しくなければ、一方の永続的な待機をもたらす.この問題を解決するために,生産者と消費者を加えたクラス静的変数は総タスクを示し,生産または消費ごとに静的変数が1つ減少し,ゼロに戻ったとき,この側のタスクがすべて完了したことを示した.だから、それぞれがwait()メソッドに移行するときは、まず相手のこの変数をチェックし、相手のこの変数が0で、タスクがすべて完了している場合は、あなたが待っても意味がなく、永久的な待機に陥るだけなので、この側も直接終了します.
生産者が先に終わると、消費者は倉庫の製品を消費してから、消費任務があっても構わず、そのまま終わる.
消費者が先に終わると、生産者は実際に生産を続け、倉庫を爆倉に敷き詰め、その後はwait()状態に移行しようとしたが、転入しても消費者が消費していないので、そのまま終了した!
public class Warehouse {
	private final int capacity=10;
	private Product[] storage;
	private int nextPos=0;
	boolean allCsmFinished=false;
	boolean allPdcFinished=false;
	
	public Warehouse(){
		storage=new Product[10];
	}

	//This method is only invoked by producer
	public synchronized boolean push(Product toPush, Producer invoker){
		//The use  of while is very important. Using if() statement will cause exception in M_P_M_C model!.
		while(isFull()){ 
			try {
				if(allCsmFinished){
					System.out.println("ACF! "+invoker.getName()+" skips waiting!
"); return false; } System.out.println(invoker.getName()+" waits!
"); wait(); //The invoker of wait() method is acquiescently the owner of this synchronized lock. } catch (InterruptedException e) { e.printStackTrace(); } } storage[nextPos]=toPush; nextPos++; System.out.println("Produced! ID "+toPush.getID()+" "+toString()); invoker.incThisOneProduced(); invoker.incTotalProduced(); System.out.println(invoker.toString()+"
"); invoker.decMyTaskRemain(); invoker.decTotalTaskRemain(); invoker.testFinish(); if(invoker.getTotalTaskRemain()==0) setAllPdcFinished(true); // System.out.println("Producer notifies all!
"); //It's safe to notifyAll() at this moment since nextPos is already changed. notifyAll(); return true; } //This method is only invoked by consumer public synchronized Product pop(Consumer invoker){ //The use of while is very important. Using if() statement will cause exception in M_P_M_C model!. while(isEmpty()){ try{ if(allPdcFinished){ System.out.println("APF! "+invoker.getName()+" skips waiting!
"); return null; } System.out.println(invoker.getName()+" waits!
"); // System.out.println(Test_SingleP_MultiC.CT1.isAlive()); // System.out.println(Test_SingleP_MultiC.CT2.isAlive()); // System.out.println(Test_SingleP_MultiC.PT1.isAlive()); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int idxToReturn=--nextPos; System.out.println("Consumed! ID "+storage[idxToReturn].getID()+" "+toString()); invoker.incThisOneConsumed(); invoker.incTotalConsumed(); System.out.println(invoker.toString()+"
"); invoker.decMyTaskRemain(); invoker.decTotalTaskRemain(); invoker.testFinish(); if(invoker.getTotalTaskRemain()==0) setAllCsmFinished(true); //NotifyAll() at this time is secure, because nextPos is changed, and return index is saved in another variabl and thus unchanged. notifyAll(); return storage[idxToReturn]; } public boolean isEmpty(){ return nextPos==0; } public boolean isFull(){ return nextPos==capacity; } public int inventory(){ return nextPos; } public void setAllCsmFinished(boolean toSet){ allCsmFinished=toSet; } public void setAllPdcFinished(boolean toSet){ allPdcFinished=toSet; } public String toString(){ String str=""; str+="[Warehouse status: Inventory="+inventory()+" Products:"; for(int i=0;i

さらに4つのテストクラスを添付します.
1.単一生産者、単一消費者.
public class Test_SingleP_SingleC {
	public static void main(String[] args){
		Warehouse wh=new Warehouse();
		Thread PT1=new Thread(new Producer("P_one",wh,50),"ProducerThread one");
		Thread CT1=new Thread(new Consumer("C_one",wh,50),"ConsumerrThread one");
//		PT1.setPriority(Thread.MIN_PRIORITY);
//		CT1.setPriority(Thread.MAX_PRIORITY);
		PT1.start();
		CT1.start();		
	}
}

2.単生産者、多消費者.
public class Test_SingleP_MultiC {
	public static void main(String[] args){
		Warehouse wh=new Warehouse();
		Thread PT1=new Thread(new Producer("P_one",wh,50),"ProducerThread one");
		Thread CT1=new Thread(new Consumer("C_one",wh,1),"ConsumerrThread one");
		Thread CT2=new Thread(new Consumer("C_two",wh,1),"ConsumerrThread two");
		
		
//		PT1.setPriority(Thread.MIN_PRIORITY);
//		CT1.setPriority(Thread.MAX_PRIORITY);
		PT1.start();
		CT1.start();
		CT2.start();
	}
}

3.多生産者、単消費者.
public class Test_MultiP_SingleC {
	public static void main(String[] args){
		Warehouse wh=new Warehouse();
		Thread PT1=new Thread(new Producer("P_one",wh,20),"ProducerThread one");
		Thread PT2=new Thread(new Producer("P_two",wh,50),"ProducerThread two");
		Thread CT2=new Thread(new Consumer("C_two",wh,30),"ConsumerrThread two");
		
//		PT1.setPriority(Thread.MIN_PRIORITY);
//		CT1.setPriority(Thread.MAX_PRIORITY);
		PT1.start();
		PT2.start();
		CT2.start();
	}
}

4.多生産者、多消費者.
public class Test_MultiP_MultiC {
	public static void main(String[] args){
		Warehouse wh=new Warehouse();
		Thread PT1=new Thread(new Producer("P_one",wh,20),"ProducerThread one");
		Thread PT2=new Thread(new Producer("P_two",wh,60),"ProducerThread two");
		Thread CT1=new Thread(new Consumer("C_one",wh,50),"ConsumerrThread one");		
		Thread CT2=new Thread(new Consumer("C_two",wh,30),"ConsumerrThread two");
//		PT1.setPriority(Thread.MIN_PRIORITY);
//		CT1.setPriority(Thread.MAX_PRIORITY);
		PT1.start();
		PT2.start();
		CT1.start();
		CT2.start();
	}
}

大体ここまで説明します.いくつかの点を考慮してから実現すれば、いくつかのパターンの問題を考慮します.
さらに機能を分離して、各クラスの独立性をより強くし、集約性をより少なくすることができますか?
消費者と生産者は自分の任務だけに関心を持ち、任務の進度を倉庫類に通知する.
倉庫類は消費生産の詳細に関心を持たず、通知を受けた後、両者の進捗情報だけでスケジューリングを行う!
これはもっと現実に合っているようですが...