Java同時モード
詳細
会社で研修をするときに使いますが、ついでにここでまとめます.
1.生産者消費者モデル
あるモジュールはデータの生成を担当し、これらのデータは別のモジュールが処理します.(ここでのモジュールは広義であり、クラス、関数、スレッド、プロセスなどであってもよい).データを生成するモジュールは、イメージ的に生産者と呼ばれ、データを処理するモジュールは、消費者と呼ばれる.生産者と消費者の間にバッファを付けて、私たちのイメージは倉庫と呼ばれ、生産者は倉庫に商品を入れ、消費者は倉庫から商品を取る.これは生産者の消費者モデルを構成している.
2.futureモード
Java 5からJavaはRunnableインタフェースの拡張版であるCallableインタフェースを提供し,Callableインタフェースはスレッド実行体として機能するcall()メソッドを提供するが,call()メソッドはrun()メソッドよりも機能が強い.
call()メソッドには戻り値があり、call()メソッドは放出異常を宣言します.
会社で研修をするときに使いますが、ついでにここでまとめます.
1.生産者消費者モデル
あるモジュールはデータの生成を担当し、これらのデータは別のモジュールが処理します.(ここでのモジュールは広義であり、クラス、関数、スレッド、プロセスなどであってもよい).データを生成するモジュールは、イメージ的に生産者と呼ばれ、データを処理するモジュールは、消費者と呼ばれる.生産者と消費者の間にバッファを付けて、私たちのイメージは倉庫と呼ばれ、生産者は倉庫に商品を入れ、消費者は倉庫から商品を取る.これは生産者の消費者モデルを構成している.
/**
*
* :
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-4-2 11:01:53
*/
public class MyData {
private final long intData;
public MyData(long d){
intData = d;
}
@Override
public String toString(){
return " MyData:"+intData;
}
}
import java.util.concurrent.BlockingQueue;
public abstract class AbstractPC implements Runnable {
protected BlockingQueue queue;
protected volatile boolean isRunning = true;
public void stop() {
isRunning = false;
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* :
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-4-2 11:02:13
*/
public class Producer extends AbstractPC{
private static AtomicInteger count = new AtomicInteger(0);
private static final int SLEEP_TIME = 3000;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
System.out.println("Producer:"+Thread.currentThread().getName()+" start");
try {
while (isRunning) {
Thread.sleep(SLEEP_TIME);
MyData data = new MyData(count.incrementAndGet());
queue.put(data);
System.out.println(Thread.currentThread().getName()+" produce:" + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.BlockingQueue;
/**
*
* :
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-4-2 11:02:22
*/
public class Consumer extends AbstractPC{
private static final int SLEEP_TIME = 3000;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
System.out.println("Consumer:"+Thread.currentThread().getName()+" start");
try {
while(isRunning){
Thread.sleep(SLEEP_TIME);
MyData data = queue.take();
if (null != data) {
System.out.println(Thread.currentThread().getName()+" consume:"+data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
*
* : , 。
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-3-28 01:10:51
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new LinkedBlockingQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer);
service.execute(producer);
service.execute(producer);
service.execute(consumer);
Thread.sleep(10000);
producer.stop();
System.out.println("producer stop!");
Thread.sleep(10000);
System.out.println("consumer stop!");
consumer.stop();
service.shutdown();
}
}
//
public interface BlockingQueue {
/**
*
* @function:
* @param e
* @throws InterruptedException
* @author: mengqingyu 2013-4-2 10:39:19
*/
void put(T e) throws InterruptedException;
/**
*
* @function:
* @return
* @throws InterruptedException
* @author: mengqingyu 2013-4-2 10:39:16
*/
T take() throws InterruptedException;
}
/**
*
* :
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param $
* Create: 2013-4-2 11:04:29
*/
public class LinkedBlockingQueue implements BlockingQueue{
//
private final int capacity;
//
private final AtomicInteger count = new AtomicInteger(0);
//
private transient Node head;
//
private transient Node last;
//
private Lock lock = new ReentrantLock();
//
private Condition condition_producer = lock.newCondition();
//
private Condition condition_consumer = lock.newCondition();
//
static class Node {
volatile T item; //
Node next; //
Node(T x) { item = x; }
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}
@Override
public void put(T t) throws InterruptedException {
if (t == null) throw new NullPointerException();
lock.lock();
try {
while (count.get() == capacity) // , 。
condition_producer.await();
insert(t);//
System.out.println(Thread.currentThread().getName()+" push:"+(capacity-1)+t);
count.getAndIncrement();// 1
condition_consumer.signalAll();
} catch (InterruptedException e) {
condition_consumer.signalAll();
e.printStackTrace();
}
finally {
lock.unlock();
}
}
/**
*
* @function:
* @param x
* @author: mengqingyu 2013-4-2 10:36:52
*/
private void insert(T x) {
last = last.next = new Node(x);
}
@Override
public T take() throws InterruptedException {
T x = null;
lock.lock();
try{
while (count.get() == 0)
condition_consumer.await();
x = extract();
System.out.println(Thread.currentThread().getName()+" pop:"+capacity+x);
count.getAndDecrement();
condition_producer.signalAll();
} catch (InterruptedException e) {
condition_producer.signalAll();
e.printStackTrace();
}
finally {
lock.unlock();
}
return x;
}
/**
*
* @function:
* @return
* @author: mengqingyu 2013-4-2 10:36:31
*/
private T extract() {
Node first = head.next;
head = first;
T x = first.item;
first.item = null;
return x;
}
}
2.futureモード
Java 5からJavaはRunnableインタフェースの拡張版であるCallableインタフェースを提供し,Callableインタフェースはスレッド実行体として機能するcall()メソッドを提供するが,call()メソッドはrun()メソッドよりも機能が強い.
call()メソッドには戻り値があり、call()メソッドは放出異常を宣言します.
import java.util.concurrent.Callable;
/**
*
* : Callable call , , 。
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-4-2 11:06:47
*/
public class DataService implements Callable {
private String param;
private static final int SLEEP_TIME = 1000;
public DataService(String param){
this.param = param;
}
@Override
public String call() throws InterruptedException{
StringBuffer sb=new StringBuffer();
for (int i = 0; i < 5; i++) {
sb.append(param);
System.out.println("Chlid Thread wait 1 second...");
Thread.sleep(SLEEP_TIME);
}
return sb.toString();
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
*
* :jdk ,
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $
* Create: 2013-3-28 01:11:43
*/
public class Main {
public static void main(String[] args) {
FutureTask future = new FutureTask(new DataService("Hello "));
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
executor.execute(future);
System.out.println("Main Thread wait 2 second...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future.get() before");
try {
System.out.println("result:" + future.get());// ,
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
// future
public interface Callable {
T call() throws Exception;;
}
public interface Future {
T get()throws Exception;
}
/**
*
* :future ,
*
* @author mengqingyu
* @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param $
* Create: 2013-4-2 11:08:52
*/
public class FutureTask implements Future,Runnable {
private T data;
private Exception exception;
private Callable callable;
private boolean isReady = false;
public FutureTask(Callable callable) {
this.callable = callable;
}
public synchronized T get() throws Exception {
while (!isReady) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (exception != null)
throw exception;
return data;
}
@Override
public synchronized void run() {
if (isReady) {
return;
}
try {
data = callable.call();
} catch (Exception e) {
exception = e;
}
isReady = true;
notifyAll();
}
}