BlockingQueueの機能と使用

4548 ワード

synchronizedを使用する場合、どのスレッドがロックを取得するかは予想できませんが、BlockingQueueはスレッドが順番にロックを取得することを実現できます.
BlockingQueueの主なAPI
主な方法はput、takeの一対のブロックアクセスである.add、pollの一対の非ブロックアクセス.
  • 挿入:
  • add(anObject):anObjectをBlockingQueueに追加します.すなわち、BlockingQueueが収容できる場合はtrueに戻ります.そうしないと異常を放出し、
  • はよくありません.
  • offer(anObject):可能であれば、anObjectをBlockingQueueに追加することを示す.すなわち、BlockingQueueが収容可能であればtrueを返し、そうでなければfalseを返す.
  • put(anObject):anObjectをBlockingQueueに追加し、BlockQueueにスペースがなければ、このメソッドを呼び出すスレッドはBlockingQueueの中にスペースがあるまでブロックされ、ブロックがあり、入れないと
  • を待つ
  • 読み込み:
  • poll(time):BlockingQueueで1位のオブジェクトを取り出し、すぐに取り出せない場合はtimeパラメータで規定された時間を待つことができ、取り出せない場合はnullに戻る.戻りnull
  • が取れません
  • take():BlockingQueueの中で1位のオブジェクトを取り出し、BlockingQueueが空であれば、BlockingQueueが新しいオブジェクトが加わるまで待機状態に入ることを遮断する.ブロックして、取れないならずっと待っています.
  • その他:
  • int remainingCapacity();キューの残りの容量を返し、キューの挿入と取得時に、でたらめをしないでください.データは正確ではないかもしれません.データの正確性は保証できません.
  • boolean remove(Object o); キューから要素を除去し、存在する場合、すなわち1つ以上を除去すると、キューはtrue
  • を返すように変更される.
  • public boolean contains(Object o); キューにこの要素が存在するかどうかを確認し、true
  • を返します.
  • int drainTo(Collection


  • BlockingQueueの実装クラス
    BlockingQueueには4つの具体的な実装クラスがあり、よく使われる2つの実装クラスは次のとおりです.
    1、ArrayBlockingQueue:配列によってサポートされる境界ブロックキューであり、所定のサイズのBlockingQueueであり、その構造関数はintパラメータを持ってそのサイズを指定しなければならない.その含まれるオブジェクトはFIFO(先入先出)の順序で並べ替えられる.
    2、LinkedBlockingQueue:大きさ不定のBlockingQueue、そのコンストラクション関数に所定の大きさのパラメータが付いていれば、生成されたBlockingQueueには大きさ制限があり、大きさパラメータが付いていなければ、生成されたBlockingQueueの大きさはInteger.MAX_VALUEによって決定される.その含まれる対象はFIFO(先入先出)順で並べ替えられる.
    LinkedBlockingQueueは容量を指定してもよいし、指定しなくてもよいし、デフォルトは最大Integer.MAX_VALUEで、主にputとtakeメソッドが用いられ、putメソッドはキューがいっぱいになるとキューメンバーが消費されるまでブロックされ、takeメソッドはキューが空いているときにブロックされ、キューメンバーが入るまでブロックされます.
    LinkedBlockingQueueとArrayBlockingQueueの違い:
    LinkedBlockingQueueとArrayBlockingQueueを比較すると、それらの背後に用いられるデータ構造が異なり、LinkedBlockingQueueのデータスループットはArrayBlockingQueueよりも大きいが、スレッド数が多い場合にはArrayBlockingQueueよりも性能の予見性が低い.
    使用例:消費者と生産者
    Test.java
    package com.mrbcy.bigdata.basic.blockingqueue;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    
    public class Test {
        public static void main(String[] args) throws Exception {
            BlockingQueue queue = new LinkedBlockingQueue(2);
            // BlockingQueue queue = new LinkedBlockingQueue();
            //      ,LinkedBlockingQueue     Integer.MAX_VALUE
            // BlockingQueue queue = new ArrayBlockingQueue(2);
            Consumer consumer = new Consumer(queue);
            Producer producer = new Producer(queue);
            for (int i = 0; i < 3; i++) {
                new Thread(producer, "Producer" + (i + 1)).start();
            }
            for (int i = 0; i < 5; i++) {
                new Thread(consumer, "Consumer" + (i + 1)).start();
            }
    
            new Thread(producer, "Producer" + (5)).start();
        }
    }
    

    Consumer.java
    package com.mrbcy.bigdata.basic.blockingqueue;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable{  
        BlockingQueue queue; 
        public Consumer(BlockingQueue queue){  
            this.queue = queue;  
        }        
        @Override  
        public void run() {  
            try {  
                String consumer = Thread.currentThread().getName();
                System.out.println(consumer);  
                String temp = queue.take();//      ,         
                System.out.println(consumer+"get a product:"+temp);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    

    Producer.java
    package com.mrbcy.bigdata.basic.blockingqueue;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Producer implements Runnable {  
        BlockingQueue queue;    
        public Producer(BlockingQueue queue) {  
            this.queue = queue;  
        }    
        @Override  
        public void run() {  
            try {  
    
                System.out.println("I have made a product:"  
                        + Thread.currentThread().getName()); 
                String temp = "A Product,     :"  
                        + Thread.currentThread().getName();  
                queue.put(temp);//        ,         
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }    
    }