JavaのFutureモードを徹底的に理解する

62184 ワード

詳細
 
jdkバージョンによって実装方法が異なります
 
1.6 sychonized 1.8は状態の方式を採用し、ここではこの2つの実現方式とfutureの使用シーンを比較する.
 
 
まず前のシーン:もしあなたが突然料理をしたいならば、しかし調理器具がなくて、食材もありません.ネットで調理器具を購入するのは便利で、食材はスーパーで買ったほうが安心です.
実现分析:速达员が食器を送っている间、私たちはきっと暇ではありません.スーパーに行って食材を买うことができます.だから、メインスレッドの中にもう一つのサブスレッドがネットで調理器具を買いに行きます.
しかし,サブスレッド実行の結果は調理器具に戻り,runメソッドには戻り値はない.だから、それが難点で、よく考えなければなりません.
アナログコード1:
package test;

public class CommonCook {

    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        //         
        OnlineShopping thread = new OnlineShopping();
        thread.start();
        thread.join();  //       
        //            
        Thread.sleep(2000);  //         
        Shicai shicai = new Shicai();
        System.out.println("   :    ");
        //            
        System.out.println("   :      ");
        cook(thread.chuju, shicai);
        
        System.out.println("    " + (System.currentTimeMillis() - startTime) + "ms");
    }
    
    //       
    static class OnlineShopping extends Thread {
        
        private Chuju chuju;

        @Override
        public void run() {
            System.out.println("   :  ");
            System.out.println("   :    ");
            try {
                Thread.sleep(5000);  //       
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("   :    ");
            chuju = new Chuju();
        }
        
    }

    //         
    static void cook(Chuju chuju, Shicai shicai) {}
    
    //    
    static class Chuju {}
    
    //    
    static class Shicai {}
} 

実行結果:
   :  
   :    
   :    
   :    
   :      
    7013ms

マルチスレッドは意味を失っていることがわかります.調理器具が届いている間、私たちは何もできません.対応コードは、joinメソッドを呼び出してプライマリスレッドをブロックすることです.
誰かに聞かれましたが、メインコースを塞がなくてもいいですか???
だめだ!!!
コードから見るとrunメソッドが実行されず,属性chujuは付与されずnullである.言い換えれば、調理器具がなくて、どうやって料理をしますか.
Javaの現在のマルチスレッドメカニズムでは、コアメソッドrunは値を返さない.runメソッドの計算結果を保存するには、計算プロセスがどんなに時間がかかってもrunメソッドの計算が完了するまで待たなければなりません.
このような気まずい状況に直面して、プログラマーは:サブスレッドrunメソッドの計算の間、メインスレッドの中で非同期実行を続けることができますか??
Where there is a will,there is a way!!!
このアイデアの核心はFutureモードであり,Java自身が実現したFutureモードをまず適用する.
シミュレーションコード2:
package test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureCook {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        //         
        Callable onlineShopping = new Callable() {

            @Override
            public Chuju call() throws Exception {
                System.out.println("   :  ");
                System.out.println("   :    ");
                Thread.sleep(5000);  //       
                System.out.println("   :    ");
                return new Chuju();
            }
            
        };
        FutureTask task = new FutureTask(onlineShopping);
        new Thread(task).start();
        //            
        Thread.sleep(2000);  //         
        Shicai shicai = new Shicai();
        System.out.println("   :    ");
        //            
        if (!task.isDone()) {  //
            System.out.println("   :     ,      (       cancel      )");
        }
        Chuju chuju = task.get();
        System.out.println("   :    ,      ");
        cook(chuju, shicai);
        
        System.out.println("    " + (System.currentTimeMillis() - startTime) + "ms");
    }
    
    //         
    static void cook(Chuju chuju, Shicai shicai) {}
    
    //    
    static class Chuju {}
    
    //    
    static class Shicai {}

}

実行結果:
   :  
   :    
   :    
   :     ,      (       cancel      )
   :    
   :    ,      
    5005ms

速達員が調理器具を送っている間、私たちは暇がなくて、食材を買うことができます.そして、調理器具が届いていないことを知っています.調理器具が届いていないときに、注文をキャンセルしてもいいです.
不思議ですね.ありますか.
次に、2番目のセグメントコードを具体的に分析します.
1)時間のかかるネット通販調理器具のロジックを,Callableのcallメソッドにカプセル化した.
public interface Callable {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

CallableインタフェースはRunnableインタフェースの補完と見なすことができ,callメソッドには戻り値があり,異常を投げ出すことができる.
 
2)CallableインスタンスをパラメータとしてFutureTaskのオブジェクトを生成し,そのオブジェクトをRunnableとしてパラメータ別スレッドとする.
public class FutureTask implements RunnableFuture
public interface RunnableFuture extends Runnable, Future
public interface Future {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

この継承システムのコアインタフェースはFutureである.Futureの核心思想は:1つの方法f、計算過程は非常に時間がかかる可能性があり、fが戻るのを待つのは明らかに賢明ではない.fを呼び出すと、すぐにFutureを返すことができ、Futureというデータ構造によって方法fの計算過程を制御することができる.
ここでの制御は次のとおりです.
getメソッド:計算結果を取得します(計算が完了していない場合は、待たなければなりません)
cancelメソッド:計算が完了していない場合は、計算プロセスをキャンセルできます.
isDoneメソッド:計算済みかどうかを判断
isCancelledメソッド:計算がキャンセルされたかどうかを判断します
これらのインタフェースの設計は完璧で、FutureTaskの実現は簡単ではないことに決まっています.後で話します.
 
3)ステップ3でisDoneメソッドを呼び出してステータスを確認し、taskを直接呼び出す.getメソッドは調理器具を取得しますが、この時はまだ届いていないので、3秒待ちます.最初のコードの実行結果と比較して、ここでは2秒節約しました.これは、宅配便の配達中にスーパーに食材を買いに行ったためで、この2つのことは同じ時間帯に非同期で実行されました.
 
以上の3ステップにより,Java原生Futureモードの最も基本的な応用を完成した.以下、FutureTaskの実装を具体的に分析し、JDK 8の実装を見てから、JDK 6の実装を比較します.
FutureTaskもRunnableである以上、runの方法を見てみましょう.
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable c = callable; //    callable           
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //   call       
                }
                if (ran)
                    set(result); //   call       
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

まずtry文ブロック内の論理を見てみると,runメソッドの主な論理はCallableのcallメソッドを実行し,結果または異常(属性resultの1つ)を保存することである.ここで考えにくいのは,callメソッドから投げ出された異常も保存されていることである.
ここでステータスを表す属性stateは何ですか?
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

FutureTaskをFutureと見なすと、Callableのcallメソッドの実行プロセスを制御し、実行中に自然に状態の変換があります.
1)FutureTaskが新しくできて、stateはNEW状態です.COMPETINGとINTERUPTING用の進行時は、瞬時状態を表し、存在時間が極めて短い(なぜこのような状態を設定するのか??).NORMAL代表は順調に完成した.EXCEPTIONALは実行過程に異常が発生したことを表す.CANCELEEDは実行プロセスがキャンセルされたことを表します.INTERUPTEDが中断されました
2)実行プロセスが順調に完了する:NEW->COMPLETING->NORMAL
3)実行プロセスに異常が発生した:NEW->COMPLETING->EXCEPTIONAL
4)実行プロセスがキャンセルされた:NEW->CANCELELED
5)実行中、スレッド中断:NEW->INTERUPTING->INTERUPTED
コード中の状態判断、CAS操作などの詳細は、ご自身でお読みください.
getメソッドの実装を見てみましょう.
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

getメソッドの論理は簡単で、callメソッドの実行プロセスが完了したら、結果を出します.完了していない場合は、現在のスレッドを保留して待機します.awaitDoneの方法の中の死の循環の論理は、何度も推演すれば理解できます.その中にスレッドを掛ける主な革新はWaitNodeクラスを定義し、複数の待機スレッドをキューに組織することであり、JDK 6の実現とは最大の違いである.
保留中のスレッドが起動するタイミング:
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t); //     
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

以上、JDK 8の大まかな実装ロジックですが、cancel、setなどの方法も、読者自身で読んでください.
JDK 6の実現を見てみましょう.
JDK 6のFutureTaskの基本的な操作はすべて独自の内部クラスSyncによって実現されているが,SyncはAbstractQueuedSynchronizerという鏡出率の極めて高い同時ツールクラスから継承されている
       /** State value representing that task is running */
        private static final int RUNNING   = 1;
        /** State value representing that task ran */
        private static final int RAN       = 2;
        /** State value representing that task was cancelled */
        private static final int CANCELLED = 4;

        /** The underlying callable */
        private final Callable callable;
        /** The result to return from get() */
        private V result;
        /** The exception to throw from get() */
        private Throwable exception;

中の状態は基本的な数だけで、計算結果と異常は別々に保存されています.
        V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

このgetメソッドの中でスレッド待ちキューを処理する方法はacquireSharedInterruptiblyメソッドを呼び出して、私の前のいくつかのブログの文章を見た読者はとてもよく知っているはずです.待機スレッドキュー、スレッドの停止、起動などの論理は、ここでは説明しません.分からない場合は、左に曲がってください.
 
最後に、Futureモードから派生したより高度なアプリケーションを見てみましょう.
前のシーンでは、データベース接続を多重化し、高同時性で正常に動作する簡単なデータベース接続プールを自分で書きました.
実装コード1:
package test;

import java.util.concurrent.ConcurrentHashMap;

public class ConnectionPool {

    private ConcurrentHashMap pool = new ConcurrentHashMap();
    
    public Connection getConnection(String key) {
        Connection conn = null;
        if (pool.containsKey(key)) {
            conn = pool.get(key);
        } else {
            conn = createConnection();
            pool.putIfAbsent(key, conn);
        }
        return conn;
    }
    
    public Connection createConnection() {
        return new Connection();
    }
    
    class Connection {}
}

ConcurrentHashMapを使用することで、getConnectionメソッドをsynchronizedに設定する必要がなくなり、複数のスレッドがgetConnectionメソッドを同時に呼び出すとパフォーマンスが大幅に向上します.
完璧なようですが、余分な接続の作成を招く可能性があります.
ある時点で、同時に3つのスレッドがgetConnectionメソッドに入りpoolを呼び出す.containsKey(key)はfalseを返し、3つのスレッドがそれぞれ接続を作成します.ConcurrentHashMapのputメソッドはそのうちの1つしか追加されませんが、2つの余分な接続が生成されます.本格的なデータベース接続であれば、リソースの浪費が大きくなります.
したがって,マルチスレッドがgetConnectionメソッドにアクセスする際にcreateConnectionを1回だけ実行する方法が現在の難点である.
以前のFutureモードの実装解析と組み合わせると、3つのスレッドが接続を作成する場合、1つのスレッドだけがcreateConnectionメソッドを実行して接続を作成する場合、他の2つのスレッドはこの接続だけでよい.さらに延長し、createConnectionメソッドをCallableのcallメソッドに入れ、FutureTaskを生成します.1つのスレッドにFutureTaskのrunメソッドを実行させるだけで,他のスレッドはgetメソッドのみを実行すればよい.
上のコード:
package test;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class ConnectionPool {

    private ConcurrentHashMap> pool = new ConcurrentHashMap>();

    public Connection getConnection(String key) throws InterruptedException, ExecutionException {
        FutureTask connectionTask = pool.get(key);
        if (connectionTask != null) {
            return connectionTask.get();
        } else {
            Callable callable = new Callable() {
                @Override
                public Connection call() throws Exception {
                    return createConnection();
                }
            };
            FutureTask newTask = new FutureTask(callable);
            connectionTask = pool.putIfAbsent(key, newTask);
            if (connectionTask == null) {
                connectionTask = newTask;
                connectionTask.run();
            }
            return connectionTask.get();
        }
    }

    public Connection createConnection() {
        return new Connection();
    }

    class Connection {
    }
}

推論:3つのスレッドがelse文ブロックに同時に入ると、それぞれFutureTaskが作成されますが、ConcurrentHashMapはそのうちの1つしか追加されません.最初のスレッドはpoolを実行する.putIfAbsentメソッドはnullを返し、connectionTaskが割り当てられ、runメソッドを実行して接続を作成し、最後にgetします.後のスレッドはpoolを実行する.putIfAbsentメソッドはnullを返さず、getメソッドのみが実行されます.
同時環境では,FutureTaskを中間変換として,ある方法を1つのスレッドのみで実行することに成功した.
これだけにしましょう.本当に心血を注いでいますね.ははは