Jdk 1.6 JUCソースコード解析(16)-FutureTask


詳細
Jdk 1.6 JUCソースコード解析(16)-FutureTask
作者:大飛
 
機能の概要:
  • FutureTaskは非同期タスク(または非同期計算)であり、栗を挙げると、メインスレッドの論理ではある値を使用する必要があるが、この値は複雑な演算が必要であり、メインスレッドは事前に非同期タスクを確立してこの値(他のスレッドで計算)を計算し、それから他のことをすることができる.この値が必要になったときに、さっき確立した非同期タスクでこの値を取得すると、少し並列になるという意味で、プライマリスレッドロジック全体の実行時間を短縮することができます.
  • FutureTaskもAQSに基づいて構築され、共有モードを使用し、AQSの状態を使用して非同期タスクの実行状態を表す.

  • ソース分析:
  • まずFutureTaskが実現したインタフェースを見てみましょう.まずRunnableFutureインタフェースを実現し、まずこのインタフェースを見てください:
  • public interface RunnableFuture extends Runnable, Future {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }

     
           RunnableFutureはまたRunnableとFutureを拡張しました:
     
    public interface Runnable {
        /**
         * When an object implementing interface Runnable is used 
         * to create a thread, starting the thread causes the object's 
         * run method to be called in that separately executing 
         * thread. 
         * 

    * The general contract of the method run is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }


     
    public interface Future {
        /**
         *          ,                       
         *     ,    false。        ,            
         *      ,            。mayInterruptIfRunning     
         *              。
         */
        boolean cancel(boolean mayInterruptIfRunning);
        /**
         *               。
         */
        boolean isCancelled();
        /**
         *         。
         */
        boolean isDone();
        /**
         *   ,           。         ,       。
         */
        V get() throws InterruptedException, ExecutionException;
        /**
         *   ,                。
         */
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

           Runnableインタフェースはマルチスレッドプログラムをよく書くときっとよく知っていると思いますが、ここでは言いません.Futureインタフェースを参照すると、タスクの取り消しインタフェースが提供され、タスクのステータスを表示するインタフェースが提供されます.最も重要なのは、ブロックされた動作を持つタスクの実行結果を取得するインタフェースが提供されます.
     
  • 次にFutureTaskの実装を見てみましょう.AQSに基づいて実装されているため、内部の同期メカニズムを見てみましょう.
  •     private final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -7828117401763700385L;
            /**          */
            private static final int RUNNING   = 1;
            /**            */
            private static final int RAN       = 2;
            /**         */
            private static final int CANCELLED = 4;
            /**    callable */
            private final Callable callable;
            /**      */
            private V result;
            /**            */
            private Throwable exception;
            /**
             *          。 set/cancel    ,    
             *  。    volatile   ,            。 
             */
            private volatile Thread runner;
            Sync(Callable callable) {
                this.callable = callable;
            }

     
           内部同期器接続では、実行するタスクを保存するためにcallableが使用されています.このインタフェースを見てください. 
    public interface Callable {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }

           このインタフェースの動作はRunnableと類似しており,異なるのは戻り値があり,異常を投げ出すことができ,Runnableへの補完である.
     
           引き続き、FutureTaskのrunメソッドをサポートするための同期器のinnerRunメソッドを見てみましょう. 
            void innerRun() {
                //               。
                if (!compareAndSetState(0, RUNNING))
                    return; //      ,    。
                try {
                    runner = Thread.currentThread(); //      。
                    if (getState() == RUNNING) //        
                        innerSet(callable.call()); //    ,        。
                    else
                        releaseShared(0); //       。
                } catch (Throwable ex) {
                    innerSetException(ex); //             ,    。
                }
            }

     
           実行結果を設定するinnerSetメソッドを見てください. 
            void innerSet(V v) {
    	       for (;;) {
    		      int s = getState(); //        。
    		      if (s == RAN)
    		          return; //          ,  。
                  if (s == CANCELLED) {
    		          //    AQS      runner null, 
    		          //                     
                      releaseShared(0);
                      return;
                  }
                  //              。
    		      if (compareAndSetState(s, RAN)) {
                      result = v; //      。
                      releaseShared(0); //  AQS   。
                      done(); //      done  ,         ,       。
    		          return;
                  }
                }
            }

     
           AQSでは、releaseSharedメソッドでtryReleaseSharedメソッドが呼び出され、現在の同期器でこのメソッドの実装を見てみましょう. 
            protected boolean tryReleaseShared(int ignore) {
                runner = null;
                return true;
            }

     
           InnerRunメソッドでは、投げ異常を実行するとinnerSetExceptionが呼び出されます.
            void innerSetException(Throwable t) {
    	        for (;;) {
    		        int s = getState();
    		        if (s == RAN)
    		            return;
                    if (s == CANCELLED) {
    		            // aggressively release to set runner to null,
    		            // in case we are racing with a cancel request
    		            // that will try to interrupt runner
                        releaseShared(0);
                        return;
                     }
    		         if (compareAndSetState(s, RAN)) {
                        exception = t;
                        result = null;
                        releaseShared(0);
                        done();
    		            return;
                     }
    	         }
            }

           プロシージャはinnerSetと似ていますが、最後に例外を設定し、resultを空にします.
           InnerRunと同様にinnerRunAndResetメソッドもあります.実装を見てください. 
            boolean innerRunAndReset() {
                if (!compareAndSetState(0, RUNNING))
                    return false;
                try {
                    runner = Thread.currentThread();
                    if (getState() == RUNNING)
                        callable.call(); // don't set result
                    runner = null;
                    return compareAndSetState(RUNNING, 0);
                } catch (Throwable ex) {
                    innerSetException(ex);
                    return false;
                }
            }

           InnerRunとの違いは、実行結果を設定せず、最後に実行が完了すると非同期タスクステータスを0にリセットします.
     
           同期器のinnerGetメソッドを見てみましょう.このメソッドはFutureTaskのgetメソッドをサポートするために使用されます. 
            V innerGet() throws InterruptedException, ExecutionException {
                //     ,         。
                acquireSharedInterruptibly(0);
                if (getState() == CANCELLED)
                    throw new CancellationException(); //         ,    CancellationException
                if (exception != null)
                    throw new ExecutionException(exception);//        ,  ExecutionException,     。
                return result; //      ,      。
            }

     
           タイムアウトのあるinnerGetのようなもの:
     
            V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
                if (!tryAcquireSharedNanos(0, nanosTimeout))
                    throw new TimeoutException();
                if (getState() == CANCELLED)
                    throw new CancellationException();
                if (exception != null)
                    throw new ExecutionException(exception);
                return result;
            }

     
           AQS分析では、acquireSharedInterruptiblyメソッドとtryAcquireSharedNanosメソッドでtryAcquireSharedメソッドが呼び出されます.現在の同期器でこのメソッドの実装を見てみましょう.
            protected int tryAcquireShared(int ignore) {
                return innerIsDone()? 1 : -1;
            }
            boolean innerIsDone() {
                return ranOrCancelled(getState()) && runner == null;
            }
            private boolean ranOrCancelled(int state) {
                return (state & (RAN | CANCELLED)) != 0;
            }

           innerGetでは、まずタスクが完了したかどうかを判断し、タスク(完了またはキャンセル)の状態に基づいて判断することがわかります.
     
           最後に、FutureTaskのcancelメソッドをサポートする同期器のinnerCancelメソッドを見てみましょう.
            boolean innerCancel(boolean mayInterruptIfRunning) {
    	        for (;;) {
    		        int s = getState();
    		        if (ranOrCancelled(s))
    		            return false; //              。
    		        if (compareAndSetState(s, CANCELLED))//             。
    		            break;
    	        }
                if (mayInterruptIfRunning) {
                    Thread r = runner;
                    if (r != null)
                        r.interrupt(); //     mayInterruptIfRunning true,      ,
                }
                releaseShared(0); //  AQS    。
                done(); //      done,          。
                return true;
            }

       
  • 内部同期メカニズムがあり、FutureTaskの実現が容易になりました.コードを見てください.
  • public class FutureTask implements RunnableFuture {
        /**       */
        private final Sync sync;
    
        public FutureTask(Callable callable) {
            if (callable == null)
                throw new NullPointerException();
            sync = new Sync(callable);
        }
    
        public FutureTask(Runnable runnable, V result) {
            sync = new Sync(Executors.callable(runnable, result));
        }
        public boolean isCancelled() {
            return sync.innerIsCancelled();
        }
        public boolean isDone() {
            return sync.innerIsDone();
        }
        public boolean cancel(boolean mayInterruptIfRunning) {
            return sync.innerCancel(mayInterruptIfRunning);
        }
    
        public V get() throws InterruptedException, ExecutionException {
            return sync.innerGet();
        }
    
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return sync.innerGet(unit.toNanos(timeout));
        }
        /**
         *          ,          ,          。
         *                        。
         */
        protected void done() { }
    
        protected void set(V v) {
            sync.innerSet(v);
        }
    
        protected void setException(Throwable t) {
            sync.innerSetException(t);
        }
    
        public void run() {
            sync.innerRun();
        }
    
        protected boolean runAndReset() {
            return sync.innerRunAndReset();
        }
        ...

     
           実装はすべて上で分析した方法に基づいており,ここではくどくどしない.注意構築方法にはRunnableからcallableへの変換があり、Executorsの方法を使用しています.このクラスは後で分析されます.ここで簡単に見てください.
        public static  Callable callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter(task, result);
        }
        static final class RunnableAdapter implements Callable {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable  task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }

           単純に適合クラスを介して適合RunnableとCallableに適合する.
     
           
    まとめてみます.
                  1.現在のスレッドが非同期タスクを確立した後、非同期タスクは初期状態(内部に数値表示状態があり、初期は0)にあり、一般的に他のスレッドによって実行されるタスク(例えばスレッドプール処理にコミットされる)に渡される.現在のスレッドが非同期タスクのgetメソッドで実行結果を取得する場合、非同期タスクがまだ実行されていない場合(内部ステータスが完了でもキャンセルでもない場合)、現在のスレッドはgetメソッドでブロックされます.
                  2.スレッドプール内の作業スレッドなどの他のスレッドが非同期タスクを実行すると、非同期タスクのステータスが「完了」(場合によってはキャンセル)に変更され、getで待機しているスレッドを除いて起動されます.
     
           FutureTaskのコード解析完了!
     
     
           参照:Jdk 1.6 JUCソース解析(6)-locks-AbstractQueuedSynchronizer