『java.util.concurrentパッケージソース読解』15スレッドプールシリーズのScheduledThreadPoolExecutor第2部

16528 ワード

この文章は主にDelayedWorkQueueについて述べている.
ScheduledThreadPoolExecutorではDelayedWorkQueueを使用して実行するタスクを保存します.これらのタスクは遅延があり、実行するたびに最初のタスクを実行するため、DelayedWorkQueueでは必ず遅延時間の短さから長さでソートされます.
DelayedWorkQueueはスタックを使用して実現されます.
BlockingQueueの実装クラスを以前に解析したのと同様に,まずoffer法を見ると,基本的にはスタックに要素を追加する論理である.
        public boolean offer(Runnable x) {

            if (x == null)

                throw new NullPointerException();

            RunnableScheduledFuture e = (RunnableScheduledFuture)x;

            final ReentrantLock lock = this.lock;

            lock.lock();

            try {

                int i = size;

                //              ,     ,        ,       

                if (i >= queue.length)

                    grow();

                size = i + 1;

                //         

                if (i == 0) {

                    queue[0] = e;



                    //   i  RunnableScheduledFuture   heapIndex

                    setIndex(e, 0);

                } else {

                    //        

                    siftUp(i, e);

                }

                //         ,            ,         

                //  ,     Condition      

                if (queue[0] == e) {

                    //          ,               ,    

                    //          

                    leader = null;

                    available.signal();

                }

            } finally {

                lock.unlock();

            }

            return true;

        }

ここでsiftUpを見てみると、スタックの実現に詳しい友达は、要素を既存のスタックに追加するアルゴリズムであることを簡単に理解できるはずです.
        private void siftUp(int k, RunnableScheduledFuture key) {

            while (k > 0) {

                int parent = (k - 1) >>> 1;

                RunnableScheduledFuture e = queue[parent];

                if (key.compareTo(e) >= 0)

                    break;

                queue[k] = e;

                setIndex(e, k);

                k = parent;

            }

            queue[k] = key;

            setIndex(key, k);

        }

ではpollを見てみましょう
        public RunnableScheduledFuture poll() {

            final ReentrantLock lock = this.lock;

            lock.lock();

            try {

                //         ,        ,                

                //   poll                 

                RunnableScheduledFuture first = queue[0];

                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)

                    return null;

                else

                    return finishPoll(first);

            } finally {

                lock.unlock();

            }

        }

pollメソッドは、実行時点に達したタスクのみを返すため、キューが遅延実行をどのように実現するかを理解するのに意味がないため、takeメソッドに重点を置いてみましょう.
        public RunnableScheduledFuture take() throws InterruptedException {

            final ReentrantLock lock = this.lock;

            lock.lockInterruptibly();

            try {

                for (;;) {

                    //

                    RunnableScheduledFuture first = queue[0];

                    if (first == null)

                        available.await();

                    else {

                        //            

                        long delay = first.getDelay(TimeUnit.NANOSECONDS);

                        //

                        if (delay <= 0)

                            //        

                            return finishPoll(first);

                        //         ,                (leader  

                        //        ,         ,          ),

                        //     take         ,

                        else if (leader != null)

                            available.await();

                        //                ,    leader  

                        //  Condition        ,        

                        else {

                            Thread thisThread = Thread.currentThread();

                            leader = thisThread;

                            try {

                                available.awaitNanos(delay);

                            } finally {

                                //   leader           

                                if (leader == thisThread)

                                    leader = null;

                            }

                        }

                    }

                }

            } finally {

                //        signal    ,       leader  

                //       leader       leader            

                //   ,      signal   awaitNanos    

                if (leader == null && queue[0] != null)

                    available.signal();

                lock.unlock();

            }

        }

takeメソッドのポイントはleaderスレッドです.遅延時間があるので、タスクを手に入れてもスレッドは待つ必要があります.leaderスレッドはその最初にタスクを実行するスレッドです.
スレッドがタスクを取得した後も遅延実行の時間を待つ必要があるため、タイムアウト待ちのpollメソッドには少し意味があります.
        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)

            throws InterruptedException {

            long nanos = unit.toNanos(timeout);

            final ReentrantLock lock = this.lock;

            lock.lockInterruptibly();

            try {

                for (;;) {

                    RunnableScheduledFuture first = queue[0];

                    //          

                    if (first == null) {

                        // nanos    0     :

                        // 1.      

                        // 2.       

                        if (nanos <= 0)

                            return null;

                        else

                            //

                            nanos = available.awaitNanos(nanos);

                    } else {

                        long delay = first.getDelay(TimeUnit.NANOSECONDS);

                        if (delay <= 0)

                            return finishPoll(first);

                        if (nanos <= 0)

                            return null;

                        // leader      nanos  delay    ,

                        //     nanos     ,       delay  

                        //     ,  leader          signal

                        //     ,           delay      

                        if (nanos < delay || leader != null)

                            nanos = available.awaitNanos(nanos);

                        else {

                            Thread thisThread = Thread.currentThread();

                            leader = thisThread;

                            try {

                                long timeLeft = available.awaitNanos(delay);

                                //        

                                nanos -= delay - timeLeft;

                            } finally {

                                if (leader == thisThread)

                                    leader = null;

                            }

                        }

                    }

                }

            } finally {

                if (leader == null && queue[0] != null)

                    available.signal();

                lock.unlock();

            }

        }

上記のコードを分析することによって、基本的にマネージャはDelayedWorkQueueの実行遅延の原理を明らかにしました.
1.実行遅延が短い順から長い順にタスクをスタックに格納する.
2.リーダースレッドを介してタスクを取得したスレッドを所定の時点まで待たせてタスクを実行する.