FutureとFutureTask


Future
Java DOCドキュメントから:Futureは非同期計算の結果を表します.計算が完了するかどうかを確認する方法を提供し、計算の完了を待って計算の結果を取得します.計算が完了したらgetメソッドのみを使用して結果を取得できます.必要に応じて、計算が完了する前にこのメソッドをブロックできます.キャンセルはcancelメソッドによって実行されます.タスクが正常に完了したかキャンセルされたかを決定する他の方法も提供されています.計算が完了すると、計算をキャンセルすることはできません.消去可能性のためにFutureを使用しても使用可能な結果が得られない場合は、Futureを宣言できます.形式タイプでnullを最下位タスクの結果として返します.
つまりFutureは次のような特性を持っています.
  • は非同期で実行され、getメソッドで実行結果を取得できます.
  • 計算がまだ完了していない場合、getメソッドはブロックされ、完了した場合、複数回取得し、すぐに結果を得ることができます.
  • 計算がまだ完了していない場合は、計算をキャンセルできます.
  • は、計算の実行ステータスを問い合わせることができる.

  • 2つの小さな問題を埋め込んでFutureをどのように実現するかを想定します.
  • Future計算完了前にgetアクセスをブロックし、完了後に自由にアクセスできます.getメソッドを実装するにはどうすればいいですか?
  • 計算のキャンセルはどのように実現されますか?キャンセルされた計算は実行を終了しますか?

  • FutureTask
    FutureTaskはJUCでのFutureの実装であり、Runnableインタフェースも実装されているので、スレッドプールにコミットして実行することができます.
    FutureTaskには、カスタム同期器Syncのプロパティが1つしかありません.すべての方法は、この同期器に委任されて実装されます.これもJUCでAQSを使う汎用モードです.
    次のコードはJDK 1.7です.10のソースコード.
    FutureTask実装
    FutureTaskの定義は、多くのコードを省略しています.
      public class FutureTask<V> implements RunnableFuture<V> {
        // FutureTask       
        private final Sync sync;
    
        public boolean isCancelled() {
            return sync.innerIsCancelled();
        }
    
        public boolean isDone() {
            return sync.innerIsDone();
        }
    
        public V get() throws InterruptedException, ExecutionException {
            return sync.innerGet();
        }
    
        //       
    }
    

    FutureTask

    Future , , , AQS 。

    FutureTask AQS 。AQS ( ) , 。

      private final class Sync extends AbstractQueuedSynchronizer {
        //              。            ,        2  。
    
        //           ,    
        private static final int READY     = 0;
    
        //          
        private static final int RUNNING   = 1;
    
        //          
        private static final int RAN       = 2;
    
        //        
        private static final int CANCELLED = 4;
    
    
        //              
        private final Callable<V> callable;
    
        //         ,  get    。
        private V result;
    
        //           ,  get       。
        private Throwable exception;
    
         /*
         *          。  set/cancel       ,       。
         *     volatile ,       (result exception)    。
         * (  runner  volatile, result exception    volatile )
         */
        private volatile Thread runner;
    
    
         /**
         *              
         */
        protected int tryAcquireShared( int ignore) {
            return innerIsDone() ? 1 : -1;
        }
    
        /**
         *            AQS    ,    runner    。
         *          AQS state  ,
         *          volatile runner      。
         */
        protected boolean tryReleaseShared( int ignore) {
            runner = null;
            return true;
        }
    
    
         //        
        void innerRun() {
            //             
            if (!compareAndSetState(READY, RUNNING))
                return;
    
            //   Future       ,  runner          。
            runner = Thread.currentThread();
    
            //            ,             
            //      CAS      RUNNING,
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                //
                try {
                    result = callable.call();
                } catch (Throwable ex) {
                    //                 
                    setException(ex);
                    return;
                }
                set(result);
            } else {
          //        
                releaseShared(0); // cancel
            }
        }
    
        //     
        void innerSet(V v) {
            //                。
            for (;;) {
                // AQS    ,       0,        READY   。
                int s = getState();
    
                //            
                if (s == RAN)
                    return;
    
                //              
                if (s == CANCELLED) {
                    // releaseShared    runner  ,
                    //                      runner
                    releaseShared(0);
                    return;
                }
    
                //       ,      
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0); //        runner,  result    
                    done();
                    return;
                }
            }
        }
    
        //          
        V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);//     ,          。
    
            //        
            if (getState() == CANCELLED)
                throw new CancellationException();
    
            //            
            if (exception != null)
                throw new ExecutionException(exception);
    
            return result;
        }
    
        //       
        boolean innerCancel( boolean mayInterruptIfRunning) {
            for (;;) {
                int s = getState();
    
                //                 
                if (ranOrCancelled(s))
                    return false;
    
                //      READY   RUNNING
                if (compareAndSetState(s, CANCELLED))
                    break;
            }
            //      ,      
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
            releaseShared(0); //             
            done();
            return true;
        }
    
        /**
         *                
         */
        private boolean ranOrCancelled( int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
    
         //       
    }
    

    innerCancel , 。 , , get , 。

    ,Future 。