Javaで生産者/消費者問題を実現する
19544 ワード
Javaには同期をサポートする4つの方法があります.その中の最初の3つは同期方法で、もう一つはパイプ方法です.
(1)wait()/notify()方法
(2)await()/signal()方法
(3)BlockingQue行列の渋滞方法
(4)PipedInputStream/PipedOutputStream
私が編纂したJAVAコードは最もよく使われている上位3つだけを実現しました.JAVAコードは eclipseデバッグを通過します.
文章があってもいいです.行ってみてもいいです.
http://blog.csdn.net/monkey_d_meng/articale/detail/6251879
コードは以下の通りです
package threadSync;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Storage , ,
*
*/
class SupperStorage {
public void produce(int num){
System.out.println("SupperStorage::produce()");
};
public void consume(int num){
System.out.println("SupperStorage::consume()");
};
}
/**
* Storage3 with LinkedBlockingQueue
*
*/
class Storage3 extends SupperStorage
{
//
private final int MAX_SIZE = 200;
//
private LinkedBlockingQueue<Object>list=new LinkedBlockingQueue<Object>(MAX_SIZE);
// num
public void produce(int num)
{
// is full
if (list.size() == MAX_SIZE)
{
System.out.println("【 】 is full: " + MAX_SIZE + " !");
}
// , num
for (int i = 1; i <= num; ++i)
{
try
{
// ,
list.put(new Object());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("【 】: " + list.size());
}
}
// num
public void consume(int num)
{
//
if (list.size() == 0)
{
System.out.println("【 】 is empty, !");
}
// , num
for (int i = 1; i <= num; ++i)
{
try
{
// ,
list.take();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
System.out.println("【 】: " + list.size());
}
// set/get
public LinkedBlockingQueue<Object> getList()
{
return list;
}
public void setList(LinkedBlockingQueue<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
/**
* Storage2 with await() / signal()
*
*/
class Storage2 extends SupperStorage
{
//
private final int MAX_SIZE = 200;
//
private LinkedList<Object> list = new LinkedList<Object>();
//
private final Lock lock = new ReentrantLock();
//
private final Condition full = lock.newCondition();
//
private final Condition empty = lock.newCondition();
// num
public void produce(int num)
{
//
lock.lock();
//
while (list.size() + num > MAX_SIZE)
{
System.out.println("【 】: " + num + " 【 】:" + list.size()
+ " !");
try
{
// ,
System.out.println(Thread.currentThread().getName()+" enter await");
full.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// , num
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println(Thread.currentThread().getName() + " 【 】:" + num + " 【 】:" + list.size());
//
full.signalAll();
empty.signalAll();
//
lock.unlock();
}
// num
public void consume(int num)
{
//
lock.lock();
//
while (list.size() < num)
{
System.out.println( Thread.currentThread().getName() + "【 】: " + num + " 【 】:" + list.size()
+ " !");
try
{
// ,
System.out.println(Thread.currentThread().getName()+" enter await");
empty.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// , num
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println(Thread.currentThread().getName() + " 【 】: " + num + " 【 】:" + list.size());
//
full.signalAll();
empty.signalAll();
//
lock.unlock();
}
// set/get
public int getMAX_SIZE()
{
return MAX_SIZE;
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
}
// Storage with wait() / notify()
class Storage extends SupperStorage {
//
private final int MAX_SIZE = 200;
//
private LinkedList<Object> list = new LinkedList<Object>();
// num
public void produce(int num)
{
//
synchronized (list)
{
//
while (list.size() + num > MAX_SIZE)
{
System.out.println("【 】:" + num + "/t【 】:"
+ list.size() + "/t !");
try
{
// ,
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// , num
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println("【 】:" + num + "/t【 】:" + list.size());
list.notifyAll();
}
}
// num
public void consume(int num)
{
//
synchronized (list)
{
//
while (list.size() < num)
{
System.out.println("【 】:" + num + "/t【 】:"
+ list.size() + "/t !");
try
{
// ,
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// , num
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println("【 】:" + num + "/t【 】:" + list.size());
list.notifyAll();
}
}
// get/set
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
/*
* Producer Thread
*
*/
class Producer extends Thread
{
//
private int num;
//
private SupperStorage storage;
// ,
public Producer(SupperStorage storage)
{
this.storage = storage;
}
// run
public void run()
{
produce(num);
}
// Storage
public void produce(int num)
{
storage.produce(num);
}
// get/set
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public SupperStorage getStorage()
{
return storage;
}
public void setStorage(SupperStorage storage)
{
this.storage = storage;
}
}
class Producer2 implements Runnable
{
//
private int num;
//
private SupperStorage storage;
// ,
public Producer2(SupperStorage storage)
{
this.storage = storage;
}
// run
public void run()
{
produce(num);
}
// Storage
public void produce(int num)
{
storage.produce(num);
}
// get/set
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public SupperStorage getStorage()
{
return storage;
}
public void setStorage(SupperStorage storage)
{
this.storage = storage;
}
}
class Consumer extends Thread
{
//
private int num;
//
private SupperStorage storage;
// ,
public Consumer(SupperStorage storage)
{
this.storage = storage;
}
// run
public void run()
{
consume(num);
}
// Storage
public void consume(int num)
{
storage.consume(num);
}
// get/set
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public SupperStorage getStorage()
{
return storage;
}
public void setStorage(SupperStorage storage)
{
this.storage = storage;
}
}
class Consumer2 implements Runnable
{
//
private int num;
//
private SupperStorage storage;
// ,
public Consumer2 (SupperStorage storage)
{
this.storage = storage;
}
// run
public void run()
{
consume(num);
}
// Storage
public void consume(int num)
{
storage.consume(num);
}
// get/set
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public SupperStorage getStorage()
{
return storage;
}
public void setStorage(SupperStorage storage)
{
this.storage = storage;
}
}
/**
* Test
*
*/
public class mythreadsync
{
//this method is used to test thread generated via class Thread
public void test_thread() {
System.out.println("test_thread(): enter");
//SupperStorage storage = new Storage3();
//SupperStorage storage = new Storage();
SupperStorage storage = new Storage2();
//
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
//
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
//
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
//
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
//
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
System.out.println("test_thread(): exit");
}
//this method is used to test threads implementing interface runnable
public void test_thread_runnable() {
System.out.println("test_thread_runnable(): enter");
SupperStorage storage = new Storage2();
//
Producer2 p1 = new Producer2(storage);
Producer2 p2 = new Producer2(storage);
Producer2 p3 = new Producer2(storage);
Producer2 p4 = new Producer2(storage);
Producer2 p5 = new Producer2(storage);
Producer2 p6 = new Producer2(storage);
Producer2 p7 = new Producer2(storage);
//
Consumer2 c1 = new Consumer2(storage);
Consumer2 c2 = new Consumer2(storage);
Consumer2 c3 = new Consumer2(storage);
//
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(20);
p7.setNum(80);
//
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
//
new Thread(c1,"consumer 1").start();
new Thread(c2,"consumer 2").start();
new Thread(c3,"consumer 3").start();
new Thread(p1,"producer 1").start();
new Thread(p2,"producer 2").start();
new Thread(p3,"producer 3").start();
new Thread(p4,"producer 4").start();
new Thread(p5,"producer 5").start();
new Thread(p6,"producer 6").start();
new Thread(p7,"producer 7").start();
System.out.println("test_thread_runnable(): exit");
}
//this method is used to test threads pool for thread implementing interface runnable
public void test_threadpool_runnal() {
System.out.println("test_threadpool_runnal(): enter");
SupperStorage storage = new Storage2();
//
Producer2 p1 = new Producer2(storage);
Producer2 p2 = new Producer2(storage);
Producer2 p3 = new Producer2(storage);
Producer2 p4 = new Producer2(storage);
Producer2 p5 = new Producer2(storage);
Producer2 p6 = new Producer2(storage);
Producer2 p7 = new Producer2(storage);
//
Consumer2 c1 = new Consumer2(storage);
Consumer2 c2 = new Consumer2(storage);
Consumer2 c3 = new Consumer2(storage);
//
p1.setNum(10);
p2.setNum(20);
p3.setNum(30);
p4.setNum(40);
p5.setNum(50);
p6.setNum(60);
p7.setNum(70);
//
c1.setNum(30);
c2.setNum(50);
c3.setNum(80);
ExecutorService pool=Executors.newFixedThreadPool(5);// 5
//
// 3 consumers
pool.submit(c1);
pool.submit(c2);
pool.submit(c3);
// 7 producers
pool.submit(p1);
pool.submit(p2);
pool.submit(p3);
pool.submit(p4);
pool.submit(p5);
pool.submit(p6);
pool.submit(p7);
/*
* it shows, at most, only 5 working threads in parallel even if 10 tasks submitted.
*
test_threadpool_runnal(): enter
pool-1-thread-1【 】: 30 【 】:0 !
pool-1-thread-1 enter await
pool-1-thread-2【 】: 50 【 】:0 !
pool-1-thread-2 enter await
pool-1-thread-4 【 】:10 【 】:10
test_threadpool_runnal(): exit
pool-1-thread-5 【 】:20 【 】:30
pool-1-thread-3【 】: 80 【 】:30 !
pool-1-thread-3 enter await
pool-1-thread-1 【 】: 30 【 】:0
pool-1-thread-1 【 】:50 【 】:50
pool-1-thread-1 【 】:60 【 】:110
pool-1-thread-1 【 】:70 【 】:180
pool-1-thread-2 【 】: 50 【 】:130
pool-1-thread-4 【 】:30 【 】:160
pool-1-thread-5 【 】:40 【 】:200
pool-1-thread-3 【 】: 80 【 】:120
*/
pool.shutdown();
System.out.println("test_threadpool_runnal(): exit");
}
public static void main(String[] args) {
mythreadsync my = new mythreadsync();
//my.test_thread();
my.test_threadpool_runnal();
//my.test_thread_runnable();
}
}