DelayQueueソース学習

7326 ワード

DelayQueueソース学習
DelayQueueは期限切れの時間を提供するキューで、待ち時間を消費した要素だけを返し、しばらくは適用シーンが見つかりませんでした...DelayQueueはBlockingQueueインタフェースを実現しているので、ブロック操作をサポートしています
  • まずエンキューしたい要素はDelayedインタフェースを実現しなければならない.まずDelayedインタフェースを見てみる:
  • //   Comparable  
    public interface Delayed extends Comparable {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         *      0       
         */
        long getDelay(TimeUnit unit);
    }
    

    次はDelayedインタフェースを実装したクラスです.
    class DelayBean implements Delayed{
        private long time;
        
        
    
        @Override
        public int compareTo(Delayed o) {
            //     1,    -1,    0
            return this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS) ? 1 : 
                (this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS) ? -1 : 0);
        }
    
        //                
        @Override
        public long getDelay(TimeUnit unit) {
            // TODO Auto-generated method stub
            return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    
        public long getTime() {
            return time;
        }
    
        public void setTime(int time) {
            this.time = time;
        }
        
    }
    
    
  • 次にDelayQueueのメンバー変数を見てみましょう:
  •     //   
        private final transient ReentrantLock lock = new ReentrantLock();
        //                 
        private final PriorityQueue q = new PriorityQueue();
        //        ,    ,Leader-Follower   , (  )[http://www.cs.wustl.edu/~schmidt/POSA/POSA2/]
        private Thread leader = null;
        
        private final Condition available = lock.newCondition();
    

    ps:PriorityQueueについてはこちらをご覧ください.PriorityQueueを集約して優先順位キューを保存しているので、DelayQueueクラスの主な精力は、この遅延機能をどのように実現するかです.
  • 入隊シリーズ方法
  • 集約PriorityQueueは配列に基づいて実装される無境界キューであるため、ここのoffer、putメソッドはブロックされず、これらのメソッドを呼び出すのは最終的にこのメソッドです.
     public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                //   leader      ,      take(),poll()      
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                //   take(), poll()  ,       ,  leader   null   
                //        leader                           
                //    offer                         
                //               ,                    
                //        ,     leader  null,          
                //         leader,          
                return true;
            } finally {
                lock.unlock();
            }
        }
    
  • poll()メソッド
  • /**
    *         ,       PrirorityQueue(  pq)         
    *           ,       null,     pq poll  
    *                 
    **/
    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
        }
    
  • ブロック待ちtake()メソッド
  •  public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        /**
                        *     leader   ,  leader  null,             
                        *    await(),   awaitNanos();
                        **/
                        if (leader != null)
                            available.await();
                        else {
                            //           ,  awaitNanos()            ,        
                            //await() awaitNanos()    ,    
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)//    peek()  null,         
                    available.signal();
                lock.unlock();
            }
        }
    
  • タイムアウト待機poll(TimeUnit timeUnit,long timeout)メソッド:
  •  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        if (nanos <= 0)
                            return null;
                        first = null; // don't retain ref while waiting
                        
                        /**
                        *                Thread leader   
                        *   leader   take       
                        **/
                        if (nanos < delay || leader != null)
                //   nanos < delay                  ,
                //           ,  poll     null ,
                //               
    
                //leader != null,              ,           
                //            ,      ,      ,          
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                //  nanos > delay     delay              
                                //         leader,        ,             
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                //        leader  null
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                        /**
                        *         offer  leader        
                        **/
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
    

    したがってpoll()メソッドではleaderの役割は、キュータイムアウト要素を取得できないスレッドが待機時間を待ってnullに戻り、キュータイムアウト要素を取得できるスレッドがよりよくキュー要素を取得できるようにすることです.