Java同時モード


詳細
会社で研修をするときに使いますが、ついでにここでまとめます.
1.生産者消費者モデル
あるモジュールはデータの生成を担当し、これらのデータは別のモジュールが処理します.(ここでのモジュールは広義であり、クラス、関数、スレッド、プロセスなどであってもよい).データを生成するモジュールは、イメージ的に生産者と呼ばれ、データを処理するモジュールは、消費者と呼ばれる.生産者と消費者の間にバッファを付けて、私たちのイメージは倉庫と呼ばれ、生産者は倉庫に商品を入れ、消費者は倉庫から商品を取る.これは生産者の消費者モデルを構成している.
/**
 * 
 *      :    
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2   11:01:53
 */
public class MyData {
	
	private final long intData;
	
	public MyData(long d){
		intData = d;
	}
	
	@Override
	public String toString(){
		return " MyData:"+intData;
	}
}

import java.util.concurrent.BlockingQueue;

public abstract class AbstractPC implements Runnable {
	
	protected BlockingQueue queue;
	
	protected volatile boolean isRunning = true;
	
	public void stop() {
		isRunning = false;
	}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 
 *      :   
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2   11:02:13
 */
public class Producer extends AbstractPC{
	
	private static AtomicInteger count = new AtomicInteger(0);
	
	private static final int SLEEP_TIME = 3000;

	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("Producer:"+Thread.currentThread().getName()+" start");
		try {
			while (isRunning) {
				Thread.sleep(SLEEP_TIME);
				MyData data = new MyData(count.incrementAndGet());
				queue.put(data);
				System.out.println(Thread.currentThread().getName()+" produce:" + data);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

import java.util.concurrent.BlockingQueue;

/**
 * 
 *      :   
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2   11:02:22
 */
public class Consumer extends AbstractPC{
	
	private static final int SLEEP_TIME = 3000;
	
	public Consumer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("Consumer:"+Thread.currentThread().getName()+" start");
		try {
			while(isRunning){
				Thread.sleep(SLEEP_TIME);
				MyData data = queue.take();
				if (null != data) {
					System.out.println(Thread.currentThread().getName()+" consume:"+data);
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 
 *      :          ,       。
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-3-28   01:10:51
 */
public class Main {
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue queue = new LinkedBlockingQueue(10);
		Producer producer = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(producer);
		service.execute(producer);
		service.execute(producer);
		service.execute(consumer);
		Thread.sleep(10000);
		producer.stop();
		System.out.println("producer stop!");
		Thread.sleep(10000);
		System.out.println("consumer stop!");
		consumer.stop();
		service.shutdown();
	}
}

//                
public interface BlockingQueue {
	
	/**
	 * 
	 * @function:    
	 * @param e
	 * @throws InterruptedException
	 * @author: mengqingyu    2013-4-2   10:39:19
	 */
	void put(T e) throws InterruptedException;
	
	/**
	 * 
	 * @function:    
	 * @return
	 * @throws InterruptedException
	 * @author: mengqingyu    2013-4-2   10:39:16
	 */
	T take() throws InterruptedException;
}

/**
 * 
 *      :         
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param  $
 * Create:  2013-4-2   11:04:29
 */
public class LinkedBlockingQueue implements BlockingQueue{

	//    
    private final int capacity;

    //   
    private final AtomicInteger count = new AtomicInteger(0);

    //     
    private transient Node head;

    //     
    private transient Node last;
    
    // 
	private Lock lock = new ReentrantLock();  
	
	//      
	private Condition condition_producer = lock.newCondition();  
	
	//      
    private Condition condition_consumer = lock.newCondition(); 
	
    //     
    static class Node {
        volatile T item; //   
        Node next;	//    
        Node(T x) { item = x; }
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node(null);
    }
    
	@Override
	public void put(T t) throws InterruptedException {
		if (t == null) throw new NullPointerException();
        lock.lock();
        try { 
			while (count.get() == capacity)	//        ,                         。
				condition_producer.await();  
			insert(t);//    
        	System.out.println(Thread.currentThread().getName()+" push:"+(capacity-1)+t);
            count.getAndIncrement();//    1
            condition_consumer.signalAll();
		} catch (InterruptedException e) {
			condition_consumer.signalAll(); 
			e.printStackTrace();
		}
		finally {
			lock.unlock();
		}
	}

	/**
	 * 
	 * @function:    
	 * @param x
	 * @author: mengqingyu    2013-4-2   10:36:52
	 */
    private void insert(T x) {
        last = last.next = new Node(x);
    }
    
	@Override
	public T take() throws InterruptedException {
		T x = null;
		lock.lock();
		try{
			while (count.get() == 0)
				condition_consumer.await();
			x = extract();
			System.out.println(Thread.currentThread().getName()+" pop:"+capacity+x);
			count.getAndDecrement();
			condition_producer.signalAll(); 
		} catch (InterruptedException e) {
			condition_producer.signalAll(); 
			e.printStackTrace();
		}
		finally {
			lock.unlock();
		}
		return x;
	}
	
	/**
	 * 
	 * @function:    
	 * @return
	 * @author: mengqingyu    2013-4-2   10:36:31
	 */
    private T extract() {
        Node first = head.next;
        head = first;
        T x = first.item;
        first.item = null;
        return x;
    }
}

2.futureモード
Java 5からJavaはRunnableインタフェースの拡張版であるCallableインタフェースを提供し,Callableインタフェースはスレッド実行体として機能するcall()メソッドを提供するが,call()メソッドはrun()メソッドよりも機能が強い.
call()メソッドには戻り値があり、call()メソッドは放出異常を宣言します.
import java.util.concurrent.Callable;

/**
 * 
 *      :  Callable    call  ,          ,              。
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2   11:06:47
 */
public class DataService implements Callable {
	
    private String param;
    
    private static final int SLEEP_TIME = 1000;
    
    public DataService(String param){
    	this.param = param;
    }
    
	@Override
	public String call() throws InterruptedException{
    	StringBuffer sb=new StringBuffer();
        for (int i = 0; i < 5; i++) {
        	sb.append(param);
        	System.out.println("Chlid Thread wait 1 second...");
			Thread.sleep(SLEEP_TIME);
        }
        return sb.toString();
	}
}

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 
 *      :jdk  ,      
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-3-28   01:11:43
 */
public class Main {
    public static void main(String[] args) {
        FutureTask future = new FutureTask(new DataService("Hello "));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            executor.execute(future);
        	System.out.println("Main Thread wait 2 second...");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        	e.printStackTrace();
        }
        System.out.println("future.get() before");
        try {
			System.out.println("result:" + future.get());//    ,       
		} catch (Exception e) {
			e.printStackTrace();
		} 
		executor.shutdown();
    }
}

//   future     
public interface Callable {
	T call() throws Exception;;
}

public interface Future {
    T get()throws Exception;
}

/**
 * 
 *      :future    ,               
 *
 * @author mengqingyu
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param  $
 * Create:  2013-4-2   11:08:52
 */
public class FutureTask implements Future,Runnable {
	
	private T data;
    
    private Exception exception;
    
    private Callable callable;
    
    private boolean isReady = false;
    
    public FutureTask(Callable callable) {
    	this.callable =  callable;
	}
    
    public synchronized T get() throws Exception {
        while (!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
            	e.printStackTrace();
            }
        }
        if (exception != null)
        	 throw exception;
        return data;
    }

	@Override
	public synchronized void run() {
        if (isReady) {                        
            return;     
        }
		try {
			data = callable.call();
		} catch (Exception e) {
			exception = e;
		}
		isReady = true;
		notifyAll();
	}
}