Java同時53:同時集合シリーズ-排他ロック+PriorityBlockingQueueに基づいて実現される一方向ブロック無境界遅延キューDelayQueue


[ハイパーリンク:Java同時学習シリーズ-緒論][シリーズ序章:Java同時43:同時集合シリーズ-序章]
原文住所:http://cmsblogs.com/?p=2413
DelayQueueは、遅延取得要素をサポートする無境界ブロックキューです.
中の要素はすべて「延期可能」の要素で、列の先頭の要素は最初に「期限切れ」の要素です.
キューにエレメントが期限切れになっていない場合は、エレメントがあってもカラムヘッダからエレメントを取得できません.
つまり、キューから要素を取得できるのは遅延期間の時だけです.
DelayQueueは主に2つの側面に使用されます.
  • キャッシュ:キャッシュ中にタイムアウトしたキャッシュデータを消去する
  • タスクタイムアウト処理
  • DelayQueue
    DelayQueueの実現の鍵は主に以下の通りである.
  • 再ロック可能ReentrantLock
  • ブロックおよび通知のためのConditionオブジェクト
  • Delay時間による優先順位キュー:PriorityQueue
  • ブロック通知を最適化するためのスレッド要素leader
  • ReentrantLock、Conditionの2つのオブジェクトは説明する必要はありません.彼はBlockingQueue全体を実現する核心です.
    PriorityQueueは、優先スレッドのソートをサポートするキューです(Java同時52:PriorityBlockingQueue参照).
    リーダーは後述する.ここではまずDelayを理解します.彼は遅延操作を実現する鍵です.
    Delayed
    Delayedインタフェースは、所定の遅延時間後に実行すべきオブジェクトをマークするために使用されます.
    オブジェクトに関連する残りの時間を返すlong getDelay(TimeUnit unit)メソッドを定義します.
    同時にインタフェースを実装するオブジェクトは、このインタフェースのgetDelayメソッドと一致するソートを提供するcompareToメソッドを定義する必要があります.
    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

    このインタフェースはどのように使用しますか?前述したように,このインタフェースを実現するgetDelay()メソッドと,compareTo()メソッドを定義すればよい.
    ないぶこうぞう
    まずDelayQueueの定義を見てみましょう.
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
            implements BlockingQueue {
        /**      */
        private final transient ReentrantLock lock = new ReentrantLock();
        /**       BlockingQueue */
        private final PriorityQueue q = new PriorityQueue();
        /**        */
        private Thread leader = null;
        /** Condition */
        private final Condition available = lock.newCondition();
    
        /**
         *       
         */
    }

    DelayQueueの内部構造を見ると、上のいくつかのキーが一目瞭然です.
    しかし、ここで注意しなければならないのは、DelayQueueの要素がDelayedインタフェースを継承しなければならないことです.
    同時に、ここからDelayQueueの内部実現のメカニズムを初歩的に理解することができます.
  • は、優先順位の無境界キューをサポートするPriorityQueueをコンテナとして、
  • 容器の中の元素はすべてDelayedインタフェースを実現すべきで、
  • は、優先キューに要素を追加するたびに、要素の有効期限をソート条件とし、
  • である.
  • で最初に期限が切れた要素は、最も優先度が高い.

  • offer()メソッド
    public boolean offer(E e) {
       final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //   PriorityQueue     
            q.offer(e);
            //            (     ),leader    ,        
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            //     ,    true
            return true;
        } finally {
            lock.unlock();
        }
    }

    offer(E e)とは、PriorityQueueに要素を追加することであり、具体的には(Java同時52:PriorityBlockingQueueを参照).
    プロセス全体は比較的簡単ですが、現在の要素がヘッダ要素であるかどうかを判断し、そうであればleader=nullを設定することは非常に重要なステップであり、後述します.
    take()メソッド
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //     
                E first = q.peek();
                //     ,  ,  off()    
                if (first == null)
                    available.await();
                else {
                    //            
                    long delay = first.getDelay(NANOSECONDS);
                    // <=0      ,  ,return
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader != null           ,  
                    if (leader != null)
                        available.await();
                    else {
                        //    leader        ,  
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //     
                            available.awaitNanos(delay);
                        } finally {
                            //   leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //       
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    まず、対ヘッダ要素を取得し、対ヘッダ要素の遅延時間delay<=0であれば、対ヘッダ要素を直接returnすればよい.
    そうでない場合はfirst=nullを設定し、ここでnullに設定する主な目的はメモリの漏洩を避けることです.
    リーダーが!nullは、現在スレッドが占有されている場合、ブロックされていることを示し、そうでない場合、leaderを現在のスレッドに設定し、awaitNanos()メソッドを呼び出してタイムアウト待機します.
    first = null
    ここでfirst=nullを設定しないとメモリ漏れを引き起こすのはなぜですか?
    スレッドAが到着し、カラムヘッダ要素が期限切れになっていません.leader=スレッドAを設定します.このとき、スレッドBが来ました.leader!=nullは、スレッドCと同じようにブロックされます.スレッドのブロックが完了した場合、カラムの先頭要素の取得に成功し、カラムを出力します.この時点で列ヘッダ要素は回収されるはずですが、問題はスレッドB、スレッドCが持っているので回収されないことです.ここには2つのスレッドしかありません.スレッドD、スレッドEがあれば...?これにより無期限に回収できなくなり、メモリが漏れてしまいます.
    このエンキュー、アウト・ペア・プロシージャは、他のブロック・キューと大きく異なりません.アウト・ペアのときに期限切れの判断を増やしたにすぎません.
    同時にリーダーによって不要なブロックを減らす.