JDKのFutureソース解析

24388 ワード

zJDKのFutureモードでは、Futureに関するAPIがあります。インターフェースCallable:戻り結果があり、異常なタスクを投げ出す可能性があります。インターフェースRunnable:戻りませんでした。結果インターフェースFuture:非同期実行の結果を表します。クラスFutureTask:Future、Runnableなどのインターフェースを実現することは、非同期的に実行されるタスクです。直接実行することができます。またはCallableに包装して実行します。
1.lable/Runnable
java.lang.Runnableインターフェースは一つのrun方法だけを宣言しました。
@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface Runnable is used
     * to create a thread, starting the thread causes the object's
     * run method to be called in that separately executing
     * thread.
     * 

* The general contract of the method run is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */

public abstract void run(); }
java.util.co ncurrent.rableは一つのCallだけを宣言する方法:
@FunctionalInterface
public interface Callable {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
これは汎型インターフェースで、call関数が返したタイプは入ってきたvタイプです。Callableは通常、ExectorServiceに合わせて使用されています。Exector Serviceインターフェースでは、いくつかのsubmit方法の重載バージョンを宣言しています。java.util.co ncurrent.ExectorServiceソースをご覧ください。
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future> submit(Runnable task);
2 Futureインターフェース
Futureとは、具体的なRunnableまたはCallableタスクの実行結果をキャンセルし、完了したかどうか調べ、結果を得ることです。必要に応じてget方法で実行結果を取得できます。この方法はタスクが結果を返すまでブロックされます。java.util.co ncurrent.Futureインターフェース:
* @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param  The result type returned by this Future's {@code get} method
 */
public interface Future {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Futureインターフェースでは、キャンセルに成功したらtrueに戻り、キャンセルに失敗したらfalseに戻ります。パラメータmayInterruptIfRunningは、実行中のジョブをキャンセルすることができますか?タスクが完了したら、mayInterruptIfRunningがtrueであるかそれともfalseであるかに関わらず、この方法はfalseに戻ります。タスクが実行されている場合、mayInterrupt IfRunningがtrueに設定されている場合、trueに戻り、mayInterrupt IfRunningがfalseに設定されている場合、falseに戻ります。タスクが実行されていない場合、mayInterruptIfRunningがtrueであるかfalseであるかにかかわらず、trueに戻ります。isCarcelledメソッドは、ジョブがキャンセルされたかどうかを示します。正常に完了する前にキャンセルされたら、trueに戻ります。isdoneメソッドは、タスクが完了したかどうかを表し、タスクが完了したら、trueに戻ります。get()方法は実行結果を取得するために用いられ、この方法はブロックを生じ、タスクの実行が完了するまで戻ってきます。get(long timeout,TimeUnit unit)は、実行結果を取得するために使用され、指定された時間内に結果が得られなかったら、そのままnullに戻ります。Futureは三つの機能を提供しています。1)タスクが完了したかどうかを判断します。2)タスクを中断することができます。3)タスク実行結果を取得することができます。
3 FutureTask
3.1 FutureTask類
FutureTaskクラスはRunnableFutureインターフェースを実現しました。
public class FutureTask<V> implements RunnableFuture<V> {
RunnableFutureはRunnableインターフェースとFutureインターフェースを継承しています。
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
したがって、FutureTaskはRunnableとしてスレッドによって実行され、またFutureとしてCallableの返却値を得ることができる。
3.2 stateフィールド
volatile修飾されたstateフィールド。
/**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
3.3その他の変数
ルナーとwaitersはvolatileタイプです。
/**    */
    private Callable callable;
    /**     */
    private Object outcome; // non-volatile, protected by state reads/writes
    /**        */
    private volatile Thread runner;
    /** get          */
    private volatile WaitNode waiters;
3.4コンストラクタ
FutureTaskには2つのコンストラクタがあります。
public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
この二つのコンストラクタは、最初のコンストラクタを使用して最後にスレッド実行結果を取得すると、callableの実行結果として異なる。第二のコンストラクタを使うと、最後にスレッドを取得して実行した結果がパラメータの中のresultです。次にFutureTaskのrun方法を見てみます。
3.5 CASツール初期化
// Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
3.6 get方法の待ち行列
static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
3.7 run方法
public void run() {
//      new,  runner    null,   
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                      null,Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                //    
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                //      ,   result state
                if (ran)
                    set(result);
            }
        } finally {
            //   runner null,        run()  
            runner = null;
            //   state,      
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
ここで、catch文のsetException(ex)は以下の通りである。
//       state outcome
protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //   get()       
            finishCompletion();
        }
    }
正常に完成した時、セット。方法は以下の通りです
//     ,  state outcome
protected void set(V v) {
//     ,NEW->COMPLETING->NORMAL
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
     outcome = v;
     UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
     //   get       
            finishCompletion();
        }
    }
getメソッドのブロックを起動するスレッド方法は以下の通りです。
//            ,  done,   callable
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
この方法はスピン待つstateがINTERUPTED(cancelに対応する方法の終了)になり、中断の終了を待つという方法です。
private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }
3.8 get方法の解析
get()方法は2つあります。
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
        //        
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
new状態のFuturetaskのgetはブロックされます。get()方法はawaitDoone方法に関連し、awaitDooneの運行結果をstateに割り当て、最終的にレポート方法はstate値によって相応の値を返します。await DooneはFutreTask全体の運行の核心です。
それでは、awaitdoneの方法を見てみます。
//       ,    /       
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        //       
        for (;;) {
        //                 
            if (Thread.interrupted()) {
            //        ,   q
                removeWaiter(q);
                //       
                throw new InterruptedException();
            }

            int s = state;
            //          ,   state  
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //      ,      ,     
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // NEW,          ,       q
            else if (q == null)
                q = new WaitNode();
            //      ,                
            else if (!queued)
            //       
            //  q.next=waiters,    waiter CAS   q
           queued = UNSAFE.compareAndSwapObject(this,waitersOffset,                                    q.next = waiters, q);
           //       ,      
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                //       
                    removeWaiter(q);
                    return state;
                }
                  get()     ,     
                LockSupport.parkNanos(this, nanos);
            }
            //         
            else
                  //  get()     ,     
                LockSupport.park(this);
        }
    }


private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
futureTaskを参考に実現します。http://www.jianshu.com/p/b765c0d0165d http://www.qingpingshan.com/rjbc/java/306642.html