JDK 5のCallableとFutureの応用:スレッドの実行結果を取得する


一、CallableインタフェースとFutureの使用例(Javaプログラミング思想から引用)
Runnableは、作業を実行する独立したタスクですが、値は返されません.タスクの完了時に値を返す場合は、RunnableインタフェースではなくCallableインタフェースを実装できます.Java SE 5に導入されたCallableは、run()ではなくメソッドcall()から返される値を表すタイプパラメータを持つ汎用型であり、ExecutorServices.submit()メソッドを使用して呼び出さなければならない簡単な例を示します.
package chapter21;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class TaskWithResult implements Callable {
	private int id;

	public TaskWithResult(int id) {
		this.id = id;
	}

	public String call() {
		return "result of TaskWithResult " + id;
	}
}

public class CallableDemo {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		ArrayList> results = new ArrayList>();
		for (int i = 0; i < 10; i++)
			//submit()     Future  
			results.add(exec.submit(new TaskWithResult(i)));
		
		for (Future fs : results)
			try {
				//    ,       
				System.out.println(fs.isDone());
				System.out.println(fs.get());
			} catch (InterruptedException e) {
				System.out.println(e);
				return;
			} catch (ExecutionException e) {
				System.out.println(e);
			} finally {
				//     
				exec.shutdown();
			}
	}
}

 
    
   
 
  
true
result of TaskWithResult 0
true
result of TaskWithResult 1
true
result of TaskWithResult 2
true
result of TaskWithResult 3
true
result of TaskWithResult 4
true
result of TaskWithResult 5
true
result of TaskWithResult 6
true
result of TaskWithResult 7
true
result of TaskWithResult 8
true
result of TaskWithResult 9

submit() Future , Callable 。 isDone() Future 。 , , get() 。 isDone() get(), ,get() , 。 get() , get(), isDone() 。

、Callable Future ( Java ( ))

Runnable , 。Callable Runnable , 。Callable , call。
public interface Callable {
    V call() throws Exception;
}
。 :Callable Integer 。
Future , , Future , 。Future 。
Future :
 
   
public interface Future {

    /**
     *        。         ,         。         ,     true,     。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     *            ,  true
     */
    boolean isCancelled();

    /**
     *         ,     false;     ,   true。
     */
    boolean isDone();

    /**
     *          ,      ,             ,     InterruptedException。        ,  get()      。
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     *          ,       ,   TimeoutException  。          .
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
 
   
 
  
FutureTask , Callable Future Runnable, 。
: Future Runnable 。
public FutureTask(Callable callable) {
       if (callable == null)
           throw new NullPointerException();
       this.callable = callable;
       this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
       this.callable = Executors.callable(runnable, result);
       this.state = NEW;       // ensure visibility of callable
}
, ,main :
package chapter14;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
 *                          
 */
public class FutureTest {

	public static void main(String[] args) {
		//Scanner in = new Scanner(System.in);
		//System.out.println("Enter base directory (e.g. /usr/local/jdk/src)");
		//String directory = in.nextLine();
		//System.out.println("Enter keyword (e.g volatile)");
		//String keyword = in.nextLine();
		
		String directory = "D:\\test";
		String keyword = "jdbc";
		
		MatchCounter counter = new MatchCounter(new File(directory), keyword);
		FutureTask task = new FutureTask(counter);
		Thread t = new Thread(task);
		t.start();//    ,  call()    
		
		try {
			System.out.println(task.get() + " matching files.");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		
	}
	
}

class MatchCounter implements Callable {
	
	private File directory;//      
	private String keyword;//       
	private int count;//  
	
	public MatchCounter(File directory, String keyword) {
		this.directory = directory;
		this.keyword = keyword;
	}

	public Integer call() throws Exception {
		count = 0;
		File[] files = directory.listFiles();//             
		List> results = new ArrayList>();
		
		//       
		for(File file : files) {
			if(file.isDirectory()) {
				MatchCounter counter = new MatchCounter(file, keyword);
				// Callable     Future  Runnable   
				FutureTask task = new FutureTask(counter);
				results.add(task);
				Thread t = new Thread(task);
				t.start();
			}else { 
				if(search(file)) {
					count ++;
				}
			}
		}
		
		for(Future result:results) {
			count += result.get();
		}
		//    
		return count;
	}
	
	/**
	 *             
	 * @param file
	 * @return
	 */
	public boolean search(File file) {
		InputStream is = null;
		Scanner in = null;
		try{
			is = new FileInputStream(file);
			in = new Scanner(is);
			boolean found = false;
			while(!found && in.hasNextLine()) {
				//    
				String line = in.nextLine();
				//       ,     
				if(line.contains(keyword)) {
					found = true;
				}
			}
			return found;
		} catch (FileNotFoundException e) {
			return false;
		} finally{
			try {
				if(is != null) {
					is.close();
				}
				if(in != null) {
					in.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
}
2 matching files.

、 CompletionService ( Java )


package tradition;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 *   Callable      
 */
public class CallableAndFuture {

	public static void main(String[] args) {
		//            
		ExecutorService threadPool = 
				Executors.newSingleThreadExecutor();
		//  Callable          
		Future future = 
		threadPool.submit(
			new Callable() {
				@Override
				public String call() throws Exception {
					Thread.sleep(2000);
					return "hello";
				}
				
			}
		);
		System.out.println("    :");
		try {
			System.out.println("    :" + future.get());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		
		//              
		ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
		//     ExecutorCompletionService,            
		CompletionService completionService = 
				new ExecutorCompletionService<>(threadPool2);
		//  10   
		for(int i = 1; i <= 10; i ++) {
			final int seq = i;
			completionService.submit(new Callable(){
				@Override
				public Integer call() throws Exception {
					Thread.sleep(new Random().nextInt(5000));
					return seq;
				}
			});
		}
		
		for(int i = 0; i < 10; i ++) {
			//      
			try {
				System.out.println(completionService.take().get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
	
}