生産者/消費者モデルJava実装方式


Javaを使用して生産者/消費者モデルを実現する方法:

 1. wait()/notify()  
 2. await()/signal()  
 3. BlockingQueue      

wait()/notify()メソッド:
public class Storage {
    //       
    private final int MAX_SIZE=10;
    //     
    private LinkedList list = new LinkedList();  

    /**
     *   num   
     * @param num
     */
    public void product(int num){
        synchronized(list){
            while(list.size()+num>MAX_SIZE){
                  System.out.println("【        】:" + num + "\t【   】:"  
                            + list.size() + "\t          !");  
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for(int i=0;inew Object());
            }
            System.out.println("【       】:" + num + "\t【     】:" + list.size());  

            list.notifyAll();  
        }
    }

       //   num     
    public void consume(int num)  
    {  
        //        
        synchronized (list)  
        {  
            //            
            while (list.size() < num)  
            {  
                System.out.println("【        】:" + num + "\t【   】:"  
                        + list.size() + "\t          !");  
                try  
                {  
                    //        ,      
                    list.wait();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
            //          ,  num     
            for (int i = 1; i <= num; ++i)  
            {  
                list.remove();  
            }  
            System.out.println("【       】:" + num + "\t【     】:" + list.size());  
            list.notifyAll();  
        }  
    }  
     // get/set    
    public LinkedList getList()  
    {  
        return list;  
    }  

    public void setList(LinkedList list)  
    {  
        this.list = list;  
    }  

    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
}

public class Producer extends Thread  
{  
    //            
    private int num;  

    //          
    private Storage storage;  

    //     ,      
    public Producer(Storage storage)  
    {  
        this.storage = storage;  
    }  

    //   run    
    public void run()  
    {  
        produce(num);  
    }  

    //     Storage       
    public void produce(int num)  
    {  
        storage.product(num);  
    }  

    // get/set    
    public int getNum()  
    {  
        return num;  
    }  

    public void setNum(int num)  
    {  
        this.num = num;  
    }  

    public Storage getStorage()  
    {  
        return storage;  
    }  

    public void setStorage(Storage storage)  
    {  
        this.storage = storage;  
    }  
}  
public class Consumer extends Thread  
{  
    //            
    private int num;  

    //          
    private Storage storage;  

    //     ,      
    public Consumer(Storage storage)  
    {  
        this.storage = storage;  
    }  

    //   run    
    public void run()  
    {  
        consume(num);  
    }  

    //     Storage       
    public void consume(int num)  
    {  
        storage.consume(num);  
    }  

    // get/set    
    public int getNum()  
    {  
        return num;  
    }  

    public void setNum(int num)  
    {  
        this.num = num;  
    }  

    public Storage getStorage()  
    {  
        return storage;  
    }  

    public void setStorage(Storage storage)  
    {  
        this.storage = storage;  
    }  
}  
public class Test01 {
     public static void main(String[] args)  
        {  
            //       
            Storage storage = new Storage();  
            //        
            Producer p1 = new Producer(storage);  
            Producer p2 = new Producer(storage);  
            Producer p3 = new Producer(storage);  

            //        
            Consumer c1 = new Consumer(storage);  
            Consumer c2 = new Consumer(storage);  
            Consumer c3 = new Consumer(storage);  

            //              
            p1.setNum(2);  
            p2.setNum(3);  
            p3.setNum(4);  


            //              
            c1.setNum(5);  
            c2.setNum(2);  
            c3.setNum(3);  

            //         
            c1.start();  
            c2.start();  
            c3.start();  

            p1.start();  
            p2.start();  
            p3.start();  
        }  
}

await()/signal()メソッド:ArrayBlockingQueueも2つのモニタキューを使用して実現されます.
public class Storage {
    //       
    private final int MAX_SIZE=10;
    //     
    private LinkedList list = new LinkedList();  

    private  final Lock lock = new ReentrantLock();  
    //           
    private final Condition full = lock.newCondition();  
    //           
    private final Condition empty = lock.newCondition();  

    /**
     *   num   
     * @param num
     */
    public void product(int num){
            lock.lock();
            while(list.size()+num>MAX_SIZE){
                  System.out.println("【        】:" + num + "\t【   】:"  
                            + list.size() + "\t          !");  
                try {
                     full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for(int i=0;inew Object());
            }
            System.out.println("【       】:" + num + "\t【     】:" + list.size());  
            //      
            empty.signalAll();  

            //      
            lock.unlock();  

    }

     //   num     
    public void consume(int num)  
    {  
        //      
        lock.lock();  

        //            
        while (list.size() < num)  
        {  
            System.out.println("【        】:" + num + "\t【   】:" + list.size()  
                    + "\t          !");  
            try  
            {  
                //        ,      
                empty.await();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  

        //          ,  num     
        for (int i = 1; i <= num; ++i)  
        {  
            list.remove();  
        }  

        System.out.println("【       】:" + num + "\t【     】:" + list.size());  

        //      
        full.signalAll();  


        //      
        lock.unlock();  
    }  

     // get/set    
    public LinkedList getList()  
    {  
        return list;  
    }  

    public void setList(LinkedList list)  
    {  
        this.list = list;  
    }  

    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
}

BlockingQueueブロッキングキューメソッド:ブロッキングキューを使用して実装された消費者/生産者モード:put()メソッド:上の生産者スレッドと同様に、容量が最大に達したときに自動的にブロックされます.take()メソッド:上の消費者スレッドと同様に、容量が0の場合、自動的にブロックされます.
public class Storage  {  
    //          
    private final int MAX_SIZE = 100;  

    //          
    private LinkedBlockingQueue list = new LinkedBlockingQueue(  
            100);  

    //   num     
    public void produce(int num)  
    {  
        //          0  
        if (list.size() == MAX_SIZE)  
        {  
            System.out.println("【   】:" + MAX_SIZE + "/t          !");  
        }  

        //          ,  num     
        for (int i = 1; i <= num; ++i)  
        {  
            try  
            {  
                //     ,      
                list.put(new Object());  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  

            System.out.println("【     】:" + list.size());  
        }  
    }  

    //   num     
    public void consume(int num)  
    {  
        //            
        if (list.size() == 0)  
        {  
            System.out.println("【   】:0/t          !");  
        }  

        //          ,  num     
        for (int i = 1; i <= num; ++i)  
        {  
            try  
            {  
                //     ,      
                list.take();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  

        System.out.println("【     】:" + list.size());  
    }  

    // set/get    
    public LinkedBlockingQueue getList()  
    {  
        return list;  
    }  

    public void setList(LinkedBlockingQueue list)  
    {  
        this.list = list;  
    }  

    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
}  

原文参照:
http://blog.csdn.net/monkey_d_meng/article/details/6251879/