Javaタスクスケジューリングスレッド池ScheduledThreadPool Exector原理解析


ScheduledThreadPoolExectorは、JDKがThreadPool Exectorに基づいて実行するタスクスケジュールスレッド池である。ScheduledThreadPoolExectorのコンストラクタは全部父(つまりThreadPool Exector)を呼び出すコンストラクタです。なお、コアスレッド数は必須であり、最大スレッド数はInteger.MAX_である。VALE、空き作業スレッドの生存時間は0で、列を塞ぐのはDelayedWorkQueです。DelayedWorkQue内部では初期容量16の配列を使ってタスクを保存していますが、容量が足りないと拡大されますので、DelayedWork Queは無境界列と考えられます。最大スレッド数の設定も意味がありません。
ThreadPool Exectorの詳細な説明については、下記を参照してください。http://blog.csdn.net/u011983531/article/details/49369489
//    
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
          new DelayedWorkQueue());
}
ScheduledThreadPoolExectorのコンストラクタはすべて親類を使用しているのですが、どのようにタイミングを調整するのですか?ScheduledThreadPool ExectorとThreadPool Exectorとの違いは主に次の2つです。
  • ジョブが異なります。ScheduledThreadPoolExectorのタスク統合はScheduledFutureTaskオブジェクトにパッケージされていますが、ThreadPool Exectorが実行するのは元のRunnableのオブジェクトです。
  • 遮断行列が違います。ScheduledThreadPoolExectorはDelayed WorkQueを使っています。名前の通り、これは遅延行列です。
  • scheduleAt FixedRate()方法を例にとって、具体的にどのように実現されているかを確認します。scheduleAt FixedRateの大体のロジックは以下の通りです。
  • は、ScheduledFutureTaskオブジェクト
  • にタスクをカプセル化する。
  • は、SchduledFutureTaskオブジェクトを遅延列に配置する
  • /**
     *     :
     * 1.    ScheduledFutureTask  
     * 2.  delayedExecute()  
     * /
    public ScheduledFuture> scheduleAtFixedRate(Runnable command,
                                                 long initialDelay,
                                                 long period,
                                                 TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask sft =
            new ScheduledFutureTask(command, null,
    			triggerTime(initialDelay, unit),
    			unit.toNanos(period));
        RunnableScheduledFuture t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    
    /**
     *     :
     * 1. task      
     * /
    private void delayedExecute(RunnableScheduledFuture> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
    
    ですから、次の一番重要なのは遅延列のDelayedWorkQueのofferとtake方法です。どうやって実現されたのかを見に来ました。DelayedWorkQue内部では、行列を使ってタスクの隊列を維持しますが、配列はどのようにしてタスクの順序を保証しますか?コードをよく見ると、ここの実装は二叉積で配列要素を並べ替えることです。正確には頂上の山です。一番小さい山は何を基準に並べられていますか?ScheduledFutureTaskはComprableインターフェースを実現したので、タスク実行時間によって逆さにソートされます。
    //      ,         ,            ,   ,
    //      index 0   ,          。             
    //   ,             ,    signal
    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture e = (RunnableScheduledFuture)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
    	        //  
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
    	        //              ,              
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    
    //    ,take     queue[0],                ,
    //           ,      ,    0,      ,
    //         ,        。     0,         ,
    //  available.awaitNanos(delay);  delay       ,
    //                 offer  signal    ,    ,
    //              ,  delay  0,        。
    public RunnableScheduledFuture take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
    	                //    ,  
                        return finishPoll(first);
                    else if (leader != null)
    	                //  
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    遅延の実現原理を明らかにしました。次の一番重要なのはサイクルスケジュールの原理です。これはScheduledFutureTaskのrun方法の中で実現したのです。周期的に実行されたかどうかを判断します。そうでない場合は、先に実行し、次の実行時間を計算して、再度遅延待ち行列にタスクを追加します。
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }