JAva同時プログラミング-ブロックの構築


JAva同時プログラミング-EXecutorフレームワーク
JAva 5は多くの新しい同時コンテナとツールを導入し、同時プログラムの作成を極めて簡素化した.ここではまずCollectionsについて説明する.SynchronizedXXXファクトリメソッドで作成された同期コンテナの不足については、ConcurrentHashMap、CopyOnWriterArrayList、BlockingQueue、CountDownLatch、Semaphore、CyclicBarrier、および表示ロッククラスについて説明します.
 
一、引用
すべての同時問題は、アクセスの同時状態をどのように調整するかに由来し、可変状態が少ないほど、同時制御が容易になります.可変状態のないオブジェクトは常に既存のセキュリティです.可変状態のオブジェクトへの同時アクセスはロックする必要があります.各メソッドにロックをかけると、同時アクセスが一定の状態になることは保証されません.コンカレント・プログラムを構築するときに、クラス・ライブラリが提供するコンカレント・コンテナやツールなど、コンカレントの制御エージェントを既存のコンカレント・クラスに渡すことができます.
二、同期容器の不足
Collections.synchronizedXXXファクトリメソッドで作成された同期コンテナは、各メソッドがコンテナの内部ロックを使用して制御されます.これは、複数の安全な読み取り操作も排他ロックを待つため、パフォーマンスの問題をもたらします.各メソッドの呼び出しがスレッド安全であっても、複数の操作室を同時に呼び出すことは、必ずしもスレッド安全ではない.例えば、欠落、追加、等しい、修正などの炒め物は、2ステップの原子操作で構成され、一緒に原子ではないので、呼び出しコードに容器のロックを使用して制御しなければならない.また,反復集合の場合,コンカレント修正によりConcurrentModefiedExceptioinが放出されることもあり,これは呼び出しプログラムが望ましくない結果となることが多い.
三、ConcurrentHashMap,CopyOnWriterArrayList
ConcurrentHashMapで使用される内部細粒度の分離ロックは、任意の数のリードスレッドの同時アクセスを可能にし、スループットを向上させる.反復中に変更が発生した場合は、反復開始時の状態を返します.また、欠落した場合に追加したり、等しい場合に修正したりする二元操作にも対応する方法がサポートされています.ConcurrentHashMapは、ConcurrentMapが提供するいくつかの特殊な原子操作を実現します.
public V putIfAbsent(K key,  V value)
指定したキーが値に関連付けられなくなった場合は、指定した値に関連付けられます.
public boolean remove(Object key, Object value)
キーのエントリは、現在キーのエントリを指定した値にマッピングしている場合にのみ削除されます.
public boolean replace(K key, V oldValue, V newValue)
キーのエントリは、現在キーのエントリが指定された値にマッピングされている場合にのみ置き換えられます.
public V replace(K key, V value)
キーのエントリは、現在キーのエントリを値にマップしている場合にのみ置き換えられます.
 
CopyOnWriterArrayListはArrayListの同時代替品であり、通常、複数の現在同時で反復できる比較的良い同時性を提供する.変更が必要になるたびに、新しいコンテナコピーが作成され、再パブリッシュされ、可変性が実現されます.下位層は配列実装を使用するため、配列要素が多い場合、レプリケーションに多くのコストがかかります.
三、BlockingQueue
ブロックキューは、ブロック可能なputメソッドとtakeメソッド、およびそれに等価なタイムアウトを指定できるofferおよびpollを提供する.Queueが空の場合、takeメソッドは要素が使用可能になるまでブロックされます.Queueが有線長であれば、キューがいっぱいになるとputメソッドもブロックされます.BlockingQueueは生産者と消費者モデルをよくサポートすることができ、生産者はキューにputし、消費者はキューからgetし、両者はよく同期することができる.BlockingQueueの実装クラスLinkedBlockingQueueとArrayBlockingQueueはFIFOキュー、PriorityBlockingQueueは優先順位順のキューです.BlockingQueueを使用して構築された生産者と消費例:
消費者:
public class Consumer implements Runnable {

	private BlockingQueue<Food> queue;
	private ExecutorService exec;

	public Consumer(BlockingQueue<Food> queue, ExecutorService exec) {
		this.queue = queue;
		this.exec = exec;
	}

	@Override
	public void run() {
		while (!exec.isShutdown()) {
			try {
				Thread.sleep(2000);
				Food food = queue.take();
				System.out.println("Consumer " + food);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (RejectedExecutionException e) {

			}
		}
	}
}

生産者:
public class Producer implements Runnable {

	private BlockingQueue<Food> queue;
	private ExecutorService exec;

	public Producer(BlockingQueue<Food> queue, ExecutorService exec) {
		this.queue = queue;
		this.exec = exec;
	}

	@Override
	public void run() {
		while (!exec.isShutdown()) {
			Food food = new Food();
			try {
				Thread.sleep(4000);
				queue.put(food);
				System.out.println("Produce " + food);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (RejectedExecutionException e) {

			}
		}
	}
}

 Main:
		BlockingQueue<Food> queue = new ArrayBlockingQueue<Food>(5);
		ExecutorService exec = Executors.newFixedThreadPool(3);
		Producer p1 = new Producer(queue, exec);
		Producer p2 = new Producer(queue, exec);

		Consumer c1 = new Consumer(queue, exec);
	
		exec.execute(p1);
		exec.execute(p2);
		exec.execute(c1);
		try {
			Thread.sleep(10000);
		} catch (InterruptedException ignored) {
		}
		exec.shutdown();

四、CountDownLatch
ロック(Latch)は、スレッドの進行を遅らせ、スレッドが終了状態に達したことを知ることができます.閉鎖動作はドアのようなもので、閉鎖がゴール状態に達するまでドアは閉じられています.終点ステータスが到着すると、ブロックされたすべてのスレッドが通過します.CountDownLatchは、カウンタを終点状態として使用し、カウンタの値が0に達したことを知っていると、ロックが開きます.awaitメソッドを呼び出すと、スレッドはカウンタが0であることを知り、countDownメソッドはカウンタを1つ減らします.
閉鎖には2つの一般的な使い方があり、閉鎖を開始し、閉鎖を終了します.ロックの開始は、1つの条件が到着した後にすべてのスレッドが一緒に実行されるのを待つために使用され、ロックの終了は、すべての条件またはすべてのスレッドが終了した後に後続の処理を行うのを待つために使用されます.例:
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(3);
Runnable prepare = new Runnable() {
	@Override
	public void run() {
		try {
			startLatch.await();//      ,        
			System.out.println("    ,    ");
			Random rnd = new Random();
			Thread.sleep(rnd.nextInt(1000));
		} catch (InterruptedException ignored) {
		}
		endLatch.countDown();
	}
};

Thread mum = new Thread(prepare);
Thread dad = new Thread(prepare);
Thread me = new Thread(prepare);
mum.start();
dad.start();
me.start();
startLatch.countDown();
try {
	endLatch.await();
} catch (InterruptedException ignored) {
}
System.out.println("  ");

五、Semaphore、信号量
信号量を用いた同期と反発の制御は最も古典的な同時モデルであり,javaでもサポートが向上している.1つのSemaphoreは有効なライセンスセットを管理し、ライセンスベースの数はコンストラクション関数によって伝達され、acquireメソッドによってライセンスが申請され、ライセンス数が0の場合、スレッドがブロックされます.そうしないと、ライセンス数が1つ減少し、releaseメソッドを使用してライセンスが解放され、ライセンス数が1つ増加します.1つの技術量が1のSemaphoreは2元信号量であり、1つの反発ロックに相当し、再入不可能なロックを表す.信号量制御を使用した同時容器の前回の例:
public class BoundedHashSet<T> {
	private final Set<T> set;
	private final Semaphore sem;

	public BoundedHashSet(int bound) {
		set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound);
	}

	public boolean add(T o) throws InterruptedException {
		sem.acquire();
		boolean wasAdded = false;
		try {
			wasAdded = set.add(o);
			return wasAdded;
		} finally {
			if (!wasAdded)
				sem.release();
		}
	}

	public boolean remove(Object o) {
		boolean wasRemoved = set.remove(o);
		if (wasRemoved)
			sem.release();
		return wasRemoved;
	}
}

六、CyclicBarrier
レベル(Barrier)は閉鎖に似ており、スレッドのセットをブロックすることができ、いくつかのイベントが発生していることを知ることができます.違いは、すべてのCyclicBarrierが待機しているのは現在のスレッドであり、一定数のスレッドがこの点に達した場合にのみ、同時に通過することができます.共通のバリアポイント(common barrier point)に達するまでスレッドのセットを互いに待つことができます.一定サイズのスレッドのセットに関連するプログラムでは、これらのスレッドは常に互いに待たなければならない.この場合、CyclicBarrierは有用である.このbarrierは、待機スレッドを解放した後に再利用できるため、ループのbarrierと呼ばれる.CyclicBarrierは、一連のスレッドの最後のスレッドが到着した後(ただし、すべてのスレッドを解放する前に)、各バリアポイントでのみ実行されるオプションのRunnableコマンドをサポートします.このバリア操作は、すべての参加スレッドを継続する前に共有状態を更新する場合に便利です.
public class Main {

	public static CyclicBarrier getCyclicBarrier(int count) {
		if (count <= 0)
			return null;
		final CyclicBarrier cyclicBarrier = new CyclicBarrier(count,
				new Runnable() {
					public void run() {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println("conditon is arrive and CycleBarrier is running");
					}
				});
		return cyclicBarrier;
	}

	public static Thread getThread(String nameOfThread,
			final CyclicBarrier cyclicBarrier) {
		Thread thread = new Thread(nameOfThread) {
			public void run() {
				System.out.println(this.getName() +
"is begin; and count is "+ (++count));
				try {
					cyclicBarrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println(this.getName() + "finished");
			}
		};
		return thread;

	}

	static int count = 0;

	public static void main(String[] args) {
		/** define a cyclicBarrier and number of barrier is 2. */
		CyclicBarrier cyclicBarrier = getCyclicBarrier(2);
		Thread threadOne = getThread("threadOne", cyclicBarrier);
		threadOne.start();
		Thread threadTwo = getThread("threadTwo", cyclicBarrier);
		threadTwo.start();
	}
}

この例では、CyclicBarrierは、2つのスレッドが到着するのを待ってconditon is arrive and CycleBarrier is runningを出力し、2つのスレッドはawaitから返される.
七、明示ロック
 
Java 5の前に、共有オブジェクトのアクセスを調整するメカニズムはsynchronizedとvolatileのみです.JAva 5は、ReentrantLockという新しい選択肢を提供しています.ReentrantLockは、ポーリングやタイミング可能なロック、中断可能なロックなど、より高度な特性を提供します.リード・ロックとライト・ロックをサポートするReentrantReadWriteLock.ReentrantLockを使用するには、finallyブロックでunlockを手動でlockまたは他の操作でロックする必要があります.
ReentrantLock:synchronizedメソッドと文を使用してアクセスする暗黙的なモニタロックと同じ基本的な動作と意味を持つ再入力可能な反発ロックロックですが、より強力な機能を備えています.ReentrantLockを使用して構築された同期Map:
public class LockedMap<K, V> {
	private Map<K, V> map;
	private Lock lock = new ReentrantLock();
	
	public LockedMap(Map<K, V> map) {
		this.map = map;
	}

	public V get(K key) {
		lock.lock();
		try {
			return map.get(key);
		} finally {
			lock.unlock();
		}
	}

	public void put(K key, V value) {
		lock.lock();
		try {
			map.put(key, value);
		} finally {
			lock.unlock();
		}
	}
}
public class ReentrantLockTest {

    private List<Integer> numbers = new ArrayList<Integer>();
    private Lock numbersLock = new ReentrantLock();

    public void addNumbers(int num) {
        try {
            numbersLock.lock();
            numbers.add(num);
        } finally {
            numbersLock.unlock();
        }
    }

    public void outputNumbers() {
        try {
            if (numbersLock.tryLock(1, TimeUnit.SECONDS)) {
                for (int num : numbers) {
                    System.out.println(num);
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        } finally {
            numbersLock.unlock();
        }
    }    

    public static void main(String[] args) {
        final ReentrantLockTest test = new ReentrantLockTest();
        Executor pool = Executors.newFixedThreadPool(3);
        pool.execute(new Runnable() {

            public void run() {
                Random rnd = new Random();
                while (true) {
                    int number = rnd.nextInt();
                    test.addNumbers(number);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        });

        pool.execute(new Runnable() {

            public void run() {
                while (true) {
                    test.outputNumbers();
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        });
    }


}
 
ReentrantReadWriteLockは、リード・ロックとライト・ロックのサポートを提供し、同じ時点で複数のリード・ロックを許可することができますが、1つのライト・ロックのみを許可し、リード・ロックの取得とライト・ロックの取得は反発します.ReentrantReadWriteLockオブジェクトのreadLockメソッドから対応するリードロックが得られ、writeLockメソッドは対応するライトロックが得られる.ReentrantReadWriteLockによって構築されたMapを使用して、複数のgetアクションを同時に実行できます.
public class ReadWriteMap<K,V>  {
	private Map<K,V> map;
	private ReadWriteLock lock = new ReentrantReadWriteLock();
	private Lock readLock = lock.readLock();
	private Lock writeLock  = lock.writeLock();	
	
	public ReadWriteMap(Map<K,V> map){
		this.map = map;
	}
	
	public V get(K key){
		readLock.lock();
		try{
			return map.get(key);
		}
		finally{
			readLock.unlock();
		}	
	}
	
	public void put(K key,V value){
		writeLock.lock();
		try{
			map.put(key, value);
		}
		finally{
			writeLock.unlock();
		}
	}
	
}
 
すべてのコードは添付ファイルを参照してください.本文は『Java同時プログラミング実践』を参照する.