Javaで生産者/消費者問題を実現する

19544 ワード


Javaには同期をサポートする4つの方法があります.その中の最初の3つは同期方法で、もう一つはパイプ方法です.
(1)wait()/notify()方法
(2)await()/signal()方法
(3)BlockingQue行列の渋滞方法
(4)PipedInputStream/PipedOutputStream
私が編纂したJAVAコードは最もよく使われている上位3つだけを実現しました.JAVAコードは eclipseデバッグを通過します.
文章があってもいいです.行ってみてもいいです.
http://blog.csdn.net/monkey_d_meng/articale/detail/6251879
 
コードは以下の通りです
 
package threadSync;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
/**
 *    Storage   ,    ,            
 * 
 */
 class SupperStorage {
 public void produce(int num){
 System.out.println("SupperStorage::produce()");
 };
 public void consume(int num){
  System.out.println("SupperStorage::consume()"); 
 };
 
}
 /**
  *    Storage3       with LinkedBlockingQueue
  * 
  */
  class Storage3 extends SupperStorage
 {
  //        
  private final int MAX_SIZE = 200;
  //        
  private LinkedBlockingQueue<Object>list=new LinkedBlockingQueue<Object>(MAX_SIZE);
  //   num   
  public void produce(int num)
  {
   //      is full
   if (list.size() == MAX_SIZE)
   {
    System.out.println("【   】 is full: " + MAX_SIZE + "           !");
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    try
    {
     //     ,    
     list.put(new Object());
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
    System.out.println("【     】: " + list.size());
   }
  }
  //   num   
  public void consume(int num)
  {
   //          
   if (list.size() == 0)
   {
    System.out.println("【   】 is empty,           !");
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    try
    {
     //     ,    
     list.take();
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
   }
   System.out.println("【     】: " + list.size());
  }
  // set/get  
  public LinkedBlockingQueue<Object> getList()
  {
   return list;
  }
  public void setList(LinkedBlockingQueue<Object> list)
  {
   this.list = list;
  }
  public int getMAX_SIZE()
  {
   return MAX_SIZE;
  }
 }
  
 /**
  *    Storage2      with await() / signal()
  * 
  */
 class Storage2 extends SupperStorage
 {
  //        
  private final int MAX_SIZE = 200;
  //        
  private LinkedList<Object> list = new LinkedList<Object>();
  //  
  private final Lock lock = new ReentrantLock();
  //         
  private final Condition full = lock.newCondition();
  //         
  private final Condition empty = lock.newCondition();
  //   num   
  public void produce(int num)
  {
   //    
   lock.lock();
   //           
   while (list.size() + num > MAX_SIZE)
   {
    System.out.println("【        】: " + num + " 【   】:" + list.size()
            + "           !");
    try
    {
     //        ,    
     System.out.println(Thread.currentThread().getName()+" enter await");
     full.await();
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    list.add(new Object());
   }
   System.out.println(Thread.currentThread().getName() + " 【       】:" + num + " 【     】:" + list.size());
   //         
   full.signalAll();
   empty.signalAll();
   //    
   lock.unlock();
  }
  //   num   
  public void consume(int num)
  {
   //    
   lock.lock();
   //          
   while (list.size() < num)
   {
    System.out.println( Thread.currentThread().getName() + "【        】: " + num + " 【   】:" + list.size()
            + "           !");
    try
    {
     //        ,    
     System.out.println(Thread.currentThread().getName()+" enter await");
     empty.await();
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    list.remove();
   }
   System.out.println(Thread.currentThread().getName() + " 【       】: " + num + " 【     】:" + list.size());
   //         
   full.signalAll();
   empty.signalAll();
   //    
   lock.unlock();
  }
  // set/get  
  public int getMAX_SIZE()
  {
   return MAX_SIZE;
  }
  public LinkedList<Object> getList()
  {
   return list;
  }
  public void setList(LinkedList<Object> list)
  {
   this.list = list;
  }
 }
 
 // Storage with wait() / notify()
 
 class Storage extends SupperStorage {
 //        
 private final int MAX_SIZE = 200;
 //        
 private LinkedList<Object> list = new LinkedList<Object>();
 //   num   
 public void produce(int num)
 {
  //      
  synchronized (list)
  {
   //           
   while (list.size() + num > MAX_SIZE)
   {
    System.out.println("【        】:" + num + "/t【   】:"
            + list.size() + "/t          !");
    try
    {
     //        ,    
     list.wait();
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    list.add(new Object());
   }
   System.out.println("【       】:" + num + "/t【     】:" + list.size());
   list.notifyAll();
  }
 }
 //   num   
 public void consume(int num)
 {
  //      
  synchronized (list)
  {
   //          
   while (list.size() < num)
   {
    System.out.println("【        】:" + num + "/t【   】:"
            + list.size() + "/t          !");
    try
    {
     //        ,    
     list.wait();
    }
    catch (InterruptedException e)
    {
     e.printStackTrace();
    }
   }
   //          ,  num   
   for (int i = 1; i <= num; ++i)
   {
    list.remove();
   }
   System.out.println("【       】:" + num + "/t【     】:" + list.size());
   list.notifyAll();
  }
 }
 // get/set  
 public LinkedList<Object> getList()
 {
  return list;
 }
 public void setList(LinkedList<Object> list)
 {
  this.list = list;
 }
 public int getMAX_SIZE()
 {
  return MAX_SIZE;
 }
}
 
 
 /*
  *     Producer     Thread 
   *  
  */  
 class Producer extends Thread  
 {  
     //             
     private int num;  
   
     //           
     private SupperStorage storage;  
   
     //     ,       
     public Producer(SupperStorage storage)  
     {  
         this.storage = storage;  
     }  
   
     //   run     
     public void run()  
     {  
         produce(num);  
     }  
   
     //     Storage        
     public void produce(int num)  
     {  
         storage.produce(num);  
     }  
   
     // get/set     
     public int getNum()  
     {  
         return num;  
     }  
   
     public void setNum(int num)  
     {  
         this.num = num;  
     }  
   
     public SupperStorage getStorage()  
     {  
         return storage;  
     }  
   
     public void setStorage(SupperStorage storage)  
     {  
         this.storage = storage;  
     }  
 }  
 class Producer2 implements Runnable  
 {  
     //             
     private int num;  
   
     //           
     private SupperStorage storage;  
   
     //     ,       
     public Producer2(SupperStorage storage)  
     {  
         this.storage = storage;  
     }  
   
     //   run     
     public void run()  
     {  
         produce(num);  
     }  
   
     //     Storage        
     public void produce(int num)  
     {  
         storage.produce(num);  
     }  
   
     // get/set     
     public int getNum()  
     {  
         return num;  
     }  
   
     public void setNum(int num)  
     {  
         this.num = num;  
     }  
   
     public SupperStorage getStorage()  
     {  
         return storage;  
     }  
   
     public void setStorage(SupperStorage storage)  
     {  
         this.storage = storage;  
     }  
 }  
 class Consumer extends Thread
{
 //          
 private int num;
 //        
 private SupperStorage storage;
 //     ,    
 public Consumer(SupperStorage storage)
 {
  this.storage = storage;
 }
 //   run  
 public void run()
 {
  consume(num);
 }
 //     Storage     
 public void consume(int num)
 {
  storage.consume(num);
 }
 // get/set  
 public int getNum()
 {
  return num;
 }
 public void setNum(int num)
 {
  this.num = num;
 }
 public SupperStorage getStorage()
 {
  return storage;
 }
 public void setStorage(SupperStorage storage)
 {
  this.storage = storage;
 }
}
 
 class Consumer2 implements Runnable
 {
  //          
  private int num;
  //        
  private SupperStorage storage;
  //     ,    
  public Consumer2 (SupperStorage storage)
  {
   this.storage = storage;
  }
  //   run  
  public void run()
  {
   consume(num);
  }
  //     Storage     
  public void consume(int num)
  {
   storage.consume(num);
  }
  // get/set  
  public int getNum()
  {
   return num;
  }
  public void setNum(int num)
  {
   this.num = num;
  }
  public SupperStorage getStorage()
  {
   return storage;
  }
  public void setStorage(SupperStorage storage)
  {
   this.storage = storage;
  }
 }
/**
 *    Test
 * 
 */
public class mythreadsync
{
 //this method is used to test thread generated via class Thread 
 public void test_thread() {
   
  System.out.println("test_thread(): enter");
   //SupperStorage storage = new Storage3();
   //SupperStorage storage = new Storage();
   SupperStorage storage = new Storage2();
   //      
   Producer p1 = new Producer(storage);
   Producer p2 = new Producer(storage);
   Producer p3 = new Producer(storage);
   Producer p4 = new Producer(storage);
   Producer p5 = new Producer(storage);
   Producer p6 = new Producer(storage);
   Producer p7 = new Producer(storage);
   //      
   Consumer c1 = new Consumer(storage);
   Consumer c2 = new Consumer(storage);
   Consumer c3 = new Consumer(storage);
   //            
   p1.setNum(10);
   p2.setNum(10);
   p3.setNum(10);
   p4.setNum(10);
   p5.setNum(10);
   p6.setNum(10);
   p7.setNum(80);
   //            
   c1.setNum(50);
   c2.setNum(20);
   c3.setNum(30);
   //       
   c1.start();
   c2.start();
   c3.start();
   
   p1.start();
   p2.start();
   p3.start();
   p4.start();
   p5.start();
   p6.start();
   p7.start();
   System.out.println("test_thread(): exit");
 }
 
 //this method is used to test threads implementing interface runnable
 
 public void test_thread_runnable() {
  
  System.out.println("test_thread_runnable(): enter");
  SupperStorage storage = new Storage2();
  //      
  
  Producer2 p1 = new Producer2(storage);
  Producer2 p2 = new Producer2(storage);
  Producer2 p3 = new Producer2(storage);
  Producer2 p4 = new Producer2(storage);
  Producer2 p5 = new Producer2(storage);
  Producer2 p6 = new Producer2(storage);
  Producer2 p7 = new Producer2(storage);
  //      
  Consumer2 c1 = new Consumer2(storage);
  Consumer2 c2 = new Consumer2(storage);
  Consumer2 c3 = new Consumer2(storage);
  //            
  p1.setNum(10);
  p2.setNum(10);
  p3.setNum(10);
  p4.setNum(10);
  p5.setNum(10);
  p6.setNum(20);
  p7.setNum(80);
  //            
  c1.setNum(50);
  c2.setNum(20);
  c3.setNum(30);
  //       
  new Thread(c1,"consumer 1").start();
  new Thread(c2,"consumer 2").start();
  new Thread(c3,"consumer 3").start();
  
  new Thread(p1,"producer 1").start();
  new Thread(p2,"producer 2").start();
  new Thread(p3,"producer 3").start();
  new Thread(p4,"producer 4").start();
  new Thread(p5,"producer 5").start();
  new Thread(p6,"producer 6").start();
  new Thread(p7,"producer 7").start();
  System.out.println("test_thread_runnable(): exit");
    }
 
 //this method is used to test threads pool for thread implementing interface runnable
 
  public void test_threadpool_runnal() {
   
   System.out.println("test_threadpool_runnal(): enter");
   SupperStorage storage = new Storage2();
   
   //      
   Producer2 p1 = new Producer2(storage);
   Producer2 p2 = new Producer2(storage);
   Producer2 p3 = new Producer2(storage);
   Producer2 p4 = new Producer2(storage);
   Producer2 p5 = new Producer2(storage);
   Producer2 p6 = new Producer2(storage);
   Producer2 p7 = new Producer2(storage);
   //      
   Consumer2 c1 = new Consumer2(storage);
   Consumer2 c2 = new Consumer2(storage);
   Consumer2 c3 = new Consumer2(storage);
   //            
   p1.setNum(10);
   p2.setNum(20);
   p3.setNum(30);
   p4.setNum(40);
   p5.setNum(50);
   p6.setNum(60);
   p7.setNum(70);
   //            
   c1.setNum(30);
   c2.setNum(50);
   c3.setNum(80);
   
   ExecutorService pool=Executors.newFixedThreadPool(5);//         5      
   //       
   // 3 consumers
   pool.submit(c1);
   pool.submit(c2);
   pool.submit(c3);
   
   // 7 producers
   pool.submit(p1);
   pool.submit(p2);
   pool.submit(p3);
   pool.submit(p4);
   pool.submit(p5);
   pool.submit(p6);
   pool.submit(p7);
/*
 * it shows, at most, only 5 working threads in parallel even if 10 tasks submitted.
 * 
test_threadpool_runnal(): enter
pool-1-thread-1【        】: 30 【   】:0           !
pool-1-thread-1 enter await
pool-1-thread-2【        】: 50 【   】:0           !
pool-1-thread-2 enter await
pool-1-thread-4 【       】:10 【     】:10
test_threadpool_runnal(): exit
pool-1-thread-5 【       】:20 【     】:30
pool-1-thread-3【        】: 80 【   】:30           !
pool-1-thread-3 enter await
pool-1-thread-1 【       】: 30 【     】:0
pool-1-thread-1 【       】:50 【     】:50
pool-1-thread-1 【       】:60 【     】:110
pool-1-thread-1 【       】:70 【     】:180
pool-1-thread-2 【       】: 50 【     】:130
pool-1-thread-4 【       】:30 【     】:160
pool-1-thread-5 【       】:40 【     】:200
pool-1-thread-3 【       】: 80 【     】:120
 */
   pool.shutdown();
   System.out.println("test_threadpool_runnal(): exit");
     }
  
 public static void main(String[] args) {
  mythreadsync my = new mythreadsync();
  //my.test_thread();
  my.test_threadpool_runnal();
  //my.test_thread_runnable();
  
 }
}