JavaマルチスレッドのFutureTask:戻り値を持つ関数定義と呼び出し方法


FutureTask戻り値の関数定義と呼び出し
Runnableインターフェースを使って定義されたタスクには戻り値がありません。多くの場合、私たちは戻り値を持っています。この問題を解決するために、JavaはCallableインターフェースを提供して、指定されたタイプの値を返します。
しかし、このインターフェース自体は実行能力を備えていないので、Javaには、Callableインターフェースを用いて返却値を定義するFutureTask類がある。
使用例
以下のコードは定義と呼び出しの全プロセスを示しています。

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
	public static void test2() throws Execution{
	    //    Lambda   Callable   , new FutureTask  Lambda     Callable     
		FutureTask<Integer> task = new FutureTask<>(() -> {
			int t = 0;
			for (int i = 0; i < 10; i++)
				t += i;
			return t;
		}); 
		
		//   Thread   task
		System.out.println("Start calling.");
		long t1 = System.nanoTime();
		new Thread(task).start();
		long result = task.get();
		long t2 = System.nanoTime();
		System.out.println("Finish calling.");
		System.out.printf("Result: %d, Time: %.3f ms.
", result, (t2 - t1) / 1000000f); } }
実行後の出力:
Start caling.
Finish caling.
Result:45、Time:13.620 ms。
JavaマルチスレッドFutureTaskの使い方と解析
1 FutureTask概念
FutureTaskはキャンセル可能な非同期計算であり、FutureTaskはFutureの基本的な方法を実現し、start cancelを持って動作し、計算が完了したかどうかを調べることができ、計算の結果を得ることができる。
結果は計算が完了した後だけ取得できます。get方法はブロックされます。計算が完了していない場合、計算が完了したら、再起動やキャンセルはできません。
一つのFutureTaskは、Callableまたはrunnableオブジェクトを包装するために使用できます。FutureTaskはRunnable方法を実現しているので、一つのFutureTaskはExcutorに実行してもいいです。
2 FutureTask使用シーン
FutureTaskは、実行結果を非同期的に取得したり、ジョブの実行をキャンセルしたりするシーンに使用することができる。
RunnableまたはCallableに着信したジョブをFutureTaskに直接呼び出し、またはスレッド池に入れて実行した後、外部でFutureTaskのget方法によって実行結果を非同期的に取得することができるので、FutureTaskは時間のかかる計算に非常に適しており、メインスレッドは自分のタスクを完了した後、結果を取得することができる。
また、FutureTaskは、たとえ複数のrun方法を呼び出しても、RunnableまたはCallableタスクを1回だけ実行するか、またはcancelによってFutureTaskの実行をキャンセルするかなどを確保することができる。
2.1 FutureTaskがマルチタスク計算を実行する使用シーン
FutureTaskとExectorServiceを利用して、マルチスレッドで計算タスクを提出することができ、主スレッドは他のタスクを継続して実行し、主スレッドがサブスレッドの計算結果を必要とする場合、非同期でサブスレッドの実行結果を取得する。

public class FutureTest1 {
	public static void main(String[] args) {
		Task task = new Task();//       
		FutureTask<Integer> future = new FutureTask<Integer>(task) {
			//         ,  
			@Override
			protected void done() {
				try {
					System.out.println("future.done():" + get());
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			}
		};
		//      (         )
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.execute(future);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		//         
		// future.cancel(true);
		try {
			//   ,          -          
			System.out.println("future.get():" + future.get());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
	//     
	static class Task implements Callable<Integer> {
		//            
		@Override
		public Integer call() throws Exception {
			int i = 0;
			for (; i < 10; i++) {
				try {
					System.out.println(Thread.currentThread().getName() + "_"
							+ i);
					Thread.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return i;
		}
	}
}
这里写图片描述
2.2 FutureTask高併合環境下でタスクが一回だけ実行されることを確保する。
多くの高併発の環境の下で、よく私達はいくつかの任務が一回だけ実行する必要があります。このような使用シーンはFutureTaskの特性が適任です。
例えば、keyを有する接続池があると仮定し、keyが存在する場合、即ち直接keyに対応するオブジェクトに戻る。keyが存在しない場合、接続を作成します。
このようなアプリケーションシーンに対して、典型的なコードは、以下のように1つのMapオブジェクトを使用してkeyと接続プールの対応関係を記憶する方法である。

private Map<String, Connection> connectionPool = new HashMap<String, Connection>();  
private ReentrantLock lock = new ReentrantLock();    
public Connection getConnection(String key){  
    try{  
        lock.lock();  
        if(connectionPool.containsKey(key)){  
            return connectionPool.get(key);  
        }  
        else{  
            //   Connection  
            Connection conn = createConnection();  
            connectionPool.put(key, conn);  
            return conn;  
        }  
    }  
    finally{  
        lock.unlock();  
    }  
}  
  
//  Connection(      ,   Connection)  
private Connection createConnection(){  
    return null;  
}  
上記の例では、ロックをかけて高合併環境でスレッドの安全を確保し、connectionを一度だけ作成することも確認しましたが、性能を犠牲にしました。Conccurrenthashに変更すると、ロックをかける操作はほとんど避けられ、性能は大幅に向上しますが、高合併の場合はConnectionが何度も作成されることがあります。
この時最も解決しなければならない問題は、keyが存在しない場合、Connectionを作成する動作がconnection Poolの後に実行されます。これはFutureTaskが機能するタイミングです。ConcerenthashMapとFutureTaskに基づく改造コードは以下の通りです。

private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();  
  
public Connection getConnection(String key) throws Exception{  
    FutureTask<Connection>connectionTask=connectionPool.get(key);  
    if(connectionTask!=null){  
        return connectionTask.get();  
    }  
    else{  
        Callable<Connection> callable = new Callable<Connection>(){  
            @Override  
            public Connection call() throws Exception {  
                // TODO Auto-generated method stub  
                return createConnection();  
            }  
        };  
        FutureTask<Connection>newTask = new FutureTask<Connection>(callable);  
        connectionTask = connectionPool.putIfAbsent(key, newTask);  
        if(connectionTask==null){  
            connectionTask = newTask;  
            connectionTask.run();  
        }  
        return connectionTask.get();  
    }  
}  
  
//  Connection(      ,   Connection)  
private Connection createConnection(){  
    return null;  
}  
このような改造を経て、同時による何度ものコネクションやロックの創建を避けることができます。
3部分のソースコードの分析
3.1構造方法

public FutureTask(Runnable runnable, V result) {  
        this.callable = Executors.callable(runnable, result);  
        this.state = NEW;       // ensure visibility of callable  
 } 
3.2キャンセル

//              running  
     public boolean cancel(boolean mayInterruptIfRunning) {  
          /** 
          *             
          *     if(state!=new || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)) 
          *         state  new        ,                             
          *    state  new    state              
          * 
          **/  
        if (!(state == NEW &&  
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  
            return false;  
        try {    // in case call to interrupt throws exception  
            //                            INTERRUPTED  
            if (mayInterruptIfRunning) {  
                try {  
                    Thread t = runner;  
                    if (t != null)  
                        t.interrupt();  
                } finally { // final state  
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  
                }  
            }  
        } finally {  
            finishCompletion();  
        }  
        return true;  
    }  
以上は個人の経験ですので、参考にしていただければと思います。