Java優先キューDelayedWorkQueueの原理分析

15348 ワード

スレッドプールが実行されると、タスクキューからタスクが取得され、タスクが実行されることがわかります.遅延またはタイミングでタスクを実行したい場合は、タスクキューがタスク遅延時間によってソートされ、遅延時間が短いほどキューの前に並び、先に実行を取得されることが重要です.
キューは先進的な先出データ構造であり,まずキューに入るデータであり,先に取得される.しかし、挿入されたデータを優先順位付けし、優先順位の高いデータが最初に取得されることを保証する優先順位キューという特殊なキューがあります.
優先順位キューの効率的な実装の一般的な方法は、スタックを使用することです.
一.スタックによる優先キューの実装
一般的なソートアルゴリズムをまとめたこの論文では,スタックソートの実現を詳細に説明した.ここで振り返ってみましょう.
1.1ヒープとは
  • 最後のレイヤノードが満たされていない以外は、他のレイヤノードが満たされている、すなわち左右のノードが満たされている完全な二叉木です.
  • それは二叉探索木ではなく、すなわち左ノードの値は親ノードの値よりも小さく、右ノードの値は親ノードの値よりも小さくなく、このように検索すると、二分方式で効率は(log N)である.
  • 親ノードの値が子ノードの値より小さくならないようにする特殊なツリーです.これにより、大きな値が上にあり、小さな値が下にあることが保証されます.したがって、スタックの遍歴と検索は非効率です.ルートノードからリーフノードまでの各パスが降順であることしか知られていませんが、各パスの間には連絡がありません.値を検索すると、左ノードから検索すべきか右ノードから検索すべきか分かりません.
  • は、迅速な挿入と削除を実現し、効率は(log N)程度です.優先順位キューを実装できます.

  • スタックは二叉木ですが、最も簡単な方法は配列を通じて二叉木を実現することです.また、スタックは完全な二叉木であるため、配列空間の浪費はありません.配列を使用してツリーを格納するにはどうすればいいですか?
    二叉木の各ノードを配列の下付き記号でシミュレートします.例えば、ルートノードは0で、第1層の左ノードは1で、右ノードは2です.これにより、次の式が得られます.
    //   n       :
    int left = 2 * n + 1; //     
    int right = 2 * n + 2; //     
    int parent = (n - 1) / 2; //    ,  n   0,          
    

    スタックについては、insertの挿入とremoveの削除の2つの操作しかありません.保証スタックの成立条件を挿入しても削除しても、1.完全二叉木です.親ノードの値は、子ノードの値より小さくすることはできません.
      public void insert(int value) {
             //         ,          。      
             store[size++] = value;
             //           。
             int index = size - 1;
             while(index > 0) {
                 //          
                 int parentIndex = (index - 1) / 2;
                 //               ,        ,      
                 if (store[index] > store[parentIndex]) {
                     swap(store, index, parentIndex);
                     index = parentIndex;
                 } else {
                     //                  ,    
                     break;
                 }
             }
         }
    

    主な手順:
  • valueをsize位置に直接挿入し、sizeを自己増加させ、store配列に値を挿入します.
  • は、このリーフノードからルートノードへのパス上のノードが、親ノードの値が子ノードより小さくてはならないことを保証する.
  • はint parentIndex=(index-1)/2によって親ノードを取得し、親ノードの値より大きい場合、両方の位置の値を交換し、その後、この親ノードとその親ノードを比較する.このノードの値が親ノードの値より小さいか、このノードがルートノードであるかのいずれかになるまでループを終了します.

  • 私たちは毎回1つの値しか挿入しないので、新しい挿入位置のリーフノードからルートノードへの経路がスタックの条件を満たすことを保証するだけで、他の経路は操作していないので、条件を満たすに違いありません.第2はsize位置に直接値を挿入するので,完全二叉木であるという条件を満たすに違いない.各サイクルindexは2で割るという倍数で減算されるので,最大サイクル数は(log N)回である.
       public int remove() {
              //       ,    
              int result = store[0];
              //               
              store[0] = store[--size];
              int index = 0;
              //     ,              。
              while(true) {
                  int leftIndex = 2 * index + 1; //     
                  int rightIndex = 2 * index + 2; //     
                  // leftIndex >= size            。
                  if (leftIndex >= size) break;
                  int maxIndex = leftIndex;
                  if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
                  if (store[index] < store[maxIndex]) {
                      swap(store, index, maxIndex);
                      index = maxIndex;
                  } else {
                      break;
                  }
              }
              return result;
          }
    

    スタック内の最大値はルートノードにあるので、操作手順:
  • ルートノードの値をresultに保存します.
  • 最後のノードの値をルートノードに移動し、長さを1つ減らします.これにより、スタックが完全な二叉木である最初の条件を満たします.
  • は、親ノードの値が子ノードの値より小さくてはならないスタックが成立する第2の条件を満たすためにループを使用する.
  • は最後にresultを返します.

  • では、スタックの2番目の条件をどのように満たすのでしょうか.
    ルートポイントの値が新しい値になると、そのサブノードよりも小さくなる可能性があるため、交換する可能性があります.
  • 左サブノードと右サブノードの値がもっと大きいことを見つけます.この値は親ノードの値と交換される可能性があります.大きな値でなければ、親ノードと交換した後、親ノードの値が子ノードより小さいことがあります.
  • では、検出された大きなサブノード値と親ノード値が比較されます.
  • 親ノードの値がそれより小さい場合、親ノードと大きな子ノードの値を交換し、大きな子ノードとその子ノードを比較します.
  • 親ノードの値が子ノードより大きい値でない場合、または子ノードがない場合(すなわち、このノードはすでにリーフノードである)、ループから飛び出します.
  • サイクルごとに2の倍数で増加するので、最大サイクル数は(log N)回です.

  • したがって,スタックにより優先順位キューを迅速に実現することができ,その挿入と削除操作の効率はいずれもO(log N)である.
    二.DelayedWorkQueueクラス
        static class DelayedWorkQueue extends AbstractQueue
            implements BlockingQueue {
    

    定義からDelayedWorkQueueはブロックされたキューであることがわかります.
    2.1重要メンバー属性
           //    ,      。
            private static final int INITIAL_CAPACITY = 16;
            //              。
            private RunnableScheduledFuture>[] queue =
                new RunnableScheduledFuture>[INITIAL_CAPACITY];
            //   lock            。
            private final ReentrantLock lock = new ReentrantLock();
            //           
            private int size = 0;
    
            //           
            private Thread leader = null;
            
            //              ,             ,        
            private final Condition available = lock.newCondition();
    

    DelayedWorkQueueはキュー内の要素を配列で格納します.では、優先キューをどのように実現するかを見てみましょう.
    2.2要素並べ替えsiftUpメソッドの挿入
            private void siftUp(int k, RunnableScheduledFuture> key) {
                //  k==0 ,            ,    
                while (k > 0) {
                    //        ,    (k - 1) / 2
                    int parent = (k - 1) >>> 1;
                    //          
                    RunnableScheduledFuture> e = queue[parent];
                    //   key           ,    ,      
                    //           。
                    if (key.compareTo(e) >= 0)
                        break;
                    //             k  
                    queue[k] = e;
                    //         ScheduledFutureTask       ,        。
                    setIndex(e, k);
                    //     k,    key              
                    k = parent;
                }
                //     ,k    key         
                queue[k] = key;
                setIndex(key, k);
            }
    

    ループにより、エレメントkeyがスタックツリーのノード位置に挿入され、親ノードの位置がインタラクティブになるように検索されます.具体的な流れは先に紹介した.
    2.3要素ソートsiftDownの除去方法
         private void siftDown(int k, RunnableScheduledFuture> key) {
                int half = size >>> 1;
                //     ,              。
                while (k < half) {
                    //     ,     (k * 2) + 1
                    int child = (k << 1) + 1;
                    //         
                    RunnableScheduledFuture> c = queue[child];
                    //     ,     (k * 2) + 2
                    int right = child + 1;
                    //                   ,               。
                    //    c child     
                    if (right < size && c.compareTo(queue[right]) > 0)
                        c = queue[child = right];
                    //                    ,       
                    if (key.compareTo(c) <= 0)
                        break;
                    //   ,               
                    queue[k] = c;
                    setIndex(c, k);
                    k = child;
                }
                queue[k] = key;
                setIndex(key, k);
            }
    

    ループにより、親ノードの値が子ノードより小さくならないことを保証します.
    2.4要素の挿入方法
            public void put(Runnable e) {
                offer(e);
            }
    
            public boolean add(Runnable e) {
                return offer(e);
            }
    
            public boolean offer(Runnable e, long timeout, TimeUnit unit) {
                return offer(e);
            }
    

    通常のブロックキューと比較して,この3つの追加方法はofferメソッドを呼び出すことであることが分かった.それはキューがいっぱいになっている条件がないため、つまりDelayedWorkQueueに要素をどんどん追加することができ、要素の個数が配列の長さを超えると配列拡張が行われます.
    public boolean offer(Runnable x) {
                if (x == null)
                    throw new NullPointerException();
                RunnableScheduledFuture> e = (RunnableScheduledFuture>)x;
                //   lock        
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = size;
                    //          ,        
                    if (i >= queue.length)
                        //     
                        grow();
                    //           
                    size = i + 1;
                    //         ,        ,       
                    if (i == 0) {
                        queue[0] = e;
                        setIndex(e, 0);
                    } else {
                        //   siftUp  ,          。
                        siftUp(i, e);
                    }
                    //             ,      ,
                    //                  。
                    if (queue[0] == e) {
                        leader = null;
                        //                
                        available.signal();
                    }
                } finally {
                    lock.unlock();
                }
                return true;
            }
    

    主に3つのステップです.
  • 要素の個数が配列長を超えるとgrow()メソッドが呼び出され,配列拡張が行われる.
  • は、新しい要素eを優先順位キュー内の対応する位置に追加し、siftUpメソッドにより、要素の優先順位に従ってソートすることを保証する.
  • 新しく挿入された要素がキューヘッダである場合、すなわちキューヘッダが交換された場合、タスクの取得を待機しているスレッドが呼び出されます.これらのスレッドは、元のキューヘッダ要素の遅延時間が来ていないため、待機している可能性があります.

  • 配列拡張方法:
          private void grow() {
                int oldCapacity = queue.length;
                //                。
                int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
                if (newCapacity < 0) // overflow
                    newCapacity = Integer.MAX_VALUE;
                //   Arrays.copyOf        
                queue = Arrays.copyOf(queue, newCapacity);
            }
    

    2.5キューヘッダ要素の取得
    2.5.1キューヘッダ要素をすぐに取得
          public RunnableScheduledFuture> poll() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture> first = queue[0];
                    //       null,           ,   null
                    if (first == null || first.getDelay(NANOSECONDS) > 0)
                        return null;
                    else
                        //        
                        return finishPoll(first);
                } finally {
                    lock.unlock();
                }
            }
    

    キューヘッダタスクがnullであるか、タスクの遅延時間が来ていない場合は、このタスクはまだ戻ってこないためnullに直接戻ります.そうでなければfinishPollメソッドを呼び出し、キューヘッダ要素を除去して返します.
            //        
            private RunnableScheduledFuture> finishPoll(RunnableScheduledFuture> f) {
                //           
                int s = --size;
                //         x
                RunnableScheduledFuture> x = queue[s];
                //           null
                queue[s] = null;
                if (s != 0)
                    //           ,        。
                    siftDown(0, x);
                setIndex(f, -1);
                return f;
            }
    

    この方法は,第1節で説明したスタックの削除方法と同様である.
  • キュー内の要素の数を1つ減らします.
  • 元のキュー末尾要素をキューヘッダ要素に設定し、キュー末尾要素をnullに設定します.
  • siftDown(0,x)メソッドを呼び出し、要素の優先順位に従ってソートすることを保証します.

  • 2.5.2キューヘッダ要素の取得待ち
    public RunnableScheduledFuture> take() throws InterruptedException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture> first = queue[0];
                        //       ,     available     。
                        if (first == null)
                            available.await();
                        else {
                            //            
                            long delay = first.getDelay(NANOSECONDS);
                            //         ,       ,    。
                            if (delay <= 0)
                                return finishPoll(first);
                            //  first   null,      ,   first   
                            first = null; // don't retain ref while waiting
    
                            //                   ,
                            //                 ,    。
                            if (leader != null)
                                available.await();
                            else {
                                //                 
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    //            ,        。
                                    available.awaitNanos(delay);
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        //          
                        available.signal();
                    lock.unlock();
                }
            }
    

    キューにタスクがない場合は、現在のスレッドをavailable条件下で待機させます.キューヘッダタスクの残りの遅延時間delayが0より大きい場合、現在のスレッドはavailable条件下でdelay時間を待つ.
    キューに新しいキューヘッダが挿入されている場合、残りの遅延時間は元のキューヘッダの時間よりも小さいに違いありません.このとき、タスクが取得できるかどうかを確認するために待機スレッドを呼び出します.
    2.5.3タイムアウト待ちキューヘッダ要素の取得
            public RunnableScheduledFuture> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
                long nanos = unit.toNanos(timeout);
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture> first = queue[0];
                        //       。
                        if (first == null) {
                            //       ,       null
                            if (nanos <= 0)
                                return null;
                            else
                                //        available     nanos  
                                nanos = available.awaitNanos(nanos);
                        } else {
                            //            
                            long delay = first.getDelay(NANOSECONDS);
                            //         ,       ,    。
                            if (delay <= 0)
                                return finishPoll(first);
                            //         ,       null
                            if (nanos <= 0)
                                return null;
                            //  first   null,      ,   first   
                            first = null; // don't retain ref while waiting
                            //                  ,            。
                            //             nanos
                            if (nanos < delay || leader != null)
                                nanos = available.awaitNanos(nanos);
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    //            ,        。
                                    long timeLeft = available.awaitNanos(delay);
                                    //          
                                    nanos -= delay - timeLeft;
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        //          
                        available.signal();
                    lock.unlock();
                }
            }
    

    takeメソッドと比較して、設定したタイムアウト時間を考慮し、タイムアウト時間が来ても役に立つタスクが取得されていない場合はnullを返します.他はtakeメソッドの論理と同じです.
    三.まとめ
    優先キューDelayedWorkQueueを使用して、キューに追加されたタスクは、タスクの遅延時間に従ってソートされ、遅延時間の少ないタスクが最初に取得されることを保証します.