シンクロナイザ


LatchラッチCountDownLatch , countDown , , await。
カウントをリセットできません.カウントをリセットする必要がある場合は、CyclicBarrierを使用することを考慮してください.特定のアクティビティのセットが、アクティビティが完了するまで待機していることを確認します.例えば、1、リソースRが初期化される前に、そのリソースを使用するすべてのアクティビティが待機している.2、サービスSを解放する前に、Sに依存するすべてのサービスを完了する.3、マルチプレイを開始する前に、すべての参加者の端末が接続されていることを確認します.
public class TestHarness {

	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
		final CountDownLatch startGate = new CountDownLatch(1);
		final CountDownLatch endGate = new CountDownLatch(nThreads);

		for (int i = 0; i < nThreads; i++) {
			Thread t = new Thread() {
				public void run() {
					try {
						startGate.await();
						try {
							task.run();
						} finally {
							endGate.countDown();
						}
					} catch (InterruptedException ignored) {
					}
				}
			};
			t.start();
		}

		long start = System.nanoTime();

		System.out.println("open!");
		startGate.countDown();

		endGate.await();
		System.out.println("close!");

		long end = System.nanoTime();
		return end - start;
	}

	public static void main(String[] ss) throws InterruptedException {
		System.out.println(new TestHarness().timeTasks(10, new Thread() {
			public void run() {
				System.out.println("I am in!");
			}
		}));
	}
}
 
FutureTaskの将来のタスク
キャンセル可能な非同期計算.計算の開始とキャンセルの方法、計算が完了したかどうかをクエリーする方法、計算結果を取得する方法を使用して、Futureの基本的な実装を提供します.
計算が完了した場合にのみ結果を取得できます.計算がまだ完了していない場合はgetメソッドをブロックします.計算が完了すると、計算を再開したりキャンセルしたりすることはできません.
三状態:
waiting to run
running
completed
 
例1
public class Preloader {
	
	private ProductInfo loadProductInfo() throws DataLoadException {
		System.out.println("load product information from a database...");
		return null;
	}

	private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
			new Callable<ProductInfo>() {
				public ProductInfo call() throws DataLoadException {
					return loadProductInfo();
				}
			});
	private final Thread thread = new Thread(future);

	public void start() {
		thread.start();
		System.out.println("thread is started.");
	}

	public ProductInfo get() throws DataLoadException, InterruptedException {
		try {
			return future.get();
		} catch (ExecutionException e) {
			Throwable cause = e.getCause();
			if (cause instanceof DataLoadException)
				throw (DataLoadException) cause;
			else
				throw LaunderThrowable.launderThrowable(cause);
		}
	}

	interface ProductInfo {
	}
	
	public static void main(String[] ss) throws DataLoadException, InterruptedException{
		Preloader loader=new Preloader();
		loader.start();
		System.out.println("---- ----");
		loader.get();
	}
}

class DataLoadException extends Exception {
	private static final long serialVersionUID = 7984266484349694761L;
}
 
例2
package creative.fire.concurrent.cache;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class ResourceCache {
	private static ConcurrentHashMap<String, FutureTask<Resource>> resourceMap = new ConcurrentHashMap<String, FutureTask<Resource>>(
			1000, 0.7f);

	private Resource retrieveFromDB(String resId) {
		Resource res = null;
		System.out.println("retrieve " + resId + " from database.");
		return res;
	}

	public Resource get(final String resId) throws InterruptedException,
			ExecutionException {
		FutureTask<Resource> resTask = resourceMap.get(resId);
		if (resTask != null){
			System.out.println("get " + resId + " from cache.");
			return resTask.get();
		}
			

		FutureTask<Resource> newTask = new FutureTask<Resource>(
				new Callable<Resource>() {
					public Resource call() throws Exception {
						return retrieveFromDB(resId);
					}
				});

		FutureTask<Resource> task = resourceMap.putIfAbsent(resId, newTask);
		if (task == null) {
			task = newTask;
			task.run();
		}
		return task.get();
	}

	interface Resource {

	}

	public static void main(String[] ss) throws InterruptedException,ExecutionException {
		ResourceCache cache = new ResourceCache();
		cache.get("Device_ABC");
		cache.get("Device_ABC");
	}
}

 
Semaphore信号は、同じ時間に特定のリソースにアクセスしたり、操作を実行したりするアクティビティのセットを制御します.
必要に応じて、ライセンスが使用可能になる前に各acquire()がブロックされ、ライセンスが取得されます.各release()にライセンスが追加され、ブロックされている取得者が解放される可能性があります.
Semaphoreは、通常、特定のリソース(物理的または論理的)にアクセスできるスレッドの数を制限するために使用されます.
public class SemaphoreBoundedBuffer<E> {
	private final Semaphore availableItems, availableSpaces;
	@GuardedBy("this")
	private final E[] items;
	@GuardedBy("this")
	private int putPosition = 0, takePosition = 0;

	public SemaphoreBoundedBuffer(int capacity) {
		if (capacity <= 0)
			throw new IllegalArgumentException();
		availableItems = new Semaphore(0);
		availableSpaces = new Semaphore(capacity);
		items = (E[]) new Object[capacity];
	}

	public boolean isEmpty() {
		return availableItems.availablePermits() == 0;
	}

	public boolean isFull() {
		return availableSpaces.availablePermits() == 0;
	}

	public void put(E x) throws InterruptedException {
		if (availableSpaces.tryAcquire(2, TimeUnit.SECONDS)) {
			System.out.println("availableSpaces acquire");
			System.out.println("available Spaces="+availableSpaces.availablePermits());
			doInsert(x);
			availableItems.release();
			System.out.println("availableItems release");
			System.out.println("available Items="+availableItems.availablePermits());
		} else {
			System.out.println("time out.");
		}
	}

	public E take() throws InterruptedException {
		// availableItems.acquire();
		if (availableItems.tryAcquire(2, TimeUnit.SECONDS)) {
			System.out.println("availableItems acquire");
			System.out.println("available Items="+availableItems.availablePermits());
			E item = doExtract();
			availableSpaces.release();
			System.out.println("availableSpaces release");
			System.out.println("available Spaces="+availableSpaces.availablePermits());
			return item;
		} else {
			System.out.println("time out.");
			return null;
		}
	}

	private synchronized void doInsert(E x) {
		int i = putPosition;
		items[i] = x;
		putPosition = (++i == items.length) ? 0 : i;
		System.out.println("insert " + x);
	}

	private synchronized E doExtract() {
		int i = takePosition;
		E x = items[i];
		items[i] = null;
		takePosition = (++i == items.length) ? 0 : i;
		System.out.println("extract " + x);
		return x;
	}

	public static void main(String[] ss) throws InterruptedException {
		final SemaphoreBoundedBuffer<Integer> buffer = new SemaphoreBoundedBuffer<Integer>(
				2);
		buffer.put(1);
		System.out.println("---- ----");
		buffer.put(2);
		System.out.println("---- ----");
		buffer.put(3);
		System.out.println("---- ----");
		buffer.take();
		System.out.println("---- ----");
		buffer.take();
		System.out.println("---- ----");
		buffer.take();
	}
}

 
Barrierバリア
共通のバリアポイント(common barrier point)に達するまでスレッドのセットを互いに待つことができます.一定サイズのスレッドのセットに関連するプログラムでは、これらのスレッドは常に互いに待たなければならない.この場合、CyclicBarrierは有用である.このbarrierは、待機スレッドを解放した後に再利用できるため、ループのbarrierと呼ばれる.
ラッチはイベントを待つために使用され、バリアは他のスレッドを待つために使用されます.
ラッチカウントが0に減少するとawait後のコードが実行されます.バリアはawait数が設定数に達した後、下に進みます.
public class CyclicBarrierTest {
	public static void main(String[] args) throws InterruptedException,
			BrokenBarrierException {
		int count = Runtime.getRuntime().availableProcessors();
		System.out.println("available processors = " + count);

		if (count == 1)
			count = 4;

		final CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
			@Override
			public void run() {
				System.out.println("figure out what they are doing next.");
			}
		});

		ExecutorService exec = Executors.newFixedThreadPool(count);
		while (count > 0) {
			exec.execute(new Thread(count + "") {
				public void run() {
					System.out.println(getName() + " gets McDonald");
					try {
						barrier.await();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			Thread.sleep(500);
			count--;
		}
		exec.shutdown();
	}
}