Javaパラレル開発ノート4


信頼できないキャンセル操作は、生産者をブロックされた操作に配置します.
 
	class BrokenPrimeProducer extends Thread{
		private final BlockingQueue<BigInteger> queue;
		private volatile boolean cancelled = false;
		
		BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
			this.queue = queue;
		}
		
		public void run(){
			try{
				BigInteger p = BigInteger.ONE;
				while(!cancelled)
					queue.put(p = p.nextProbablePrime());
			}catch(InterruptedException consumed){
				
			}
		}
		
		public void cancel(){ cancelled = true;}
	}
	
	void consumePrimes() throws InterruptedException {
		BlockingQueue<BigInteger> primes = ...;
		BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
		producer.start();
		try{
			while(needMorePrimes()){
				consume(primes.take());
			}
		}finally{
			producer.cancel();
		}
	}

 上記のコードでは、生産者スレッドが素数を生成し、ブロックキューに入れます.生産者の速度が消費者の処理速度を超えると、キューが満たされ、putメソッドもブロックされます.生産者がputメソッドでブロックされている場合、消費者が生産タスクをキャンセルしたい場合、どのような状況が発生しますか?cancelメソッドを呼び出してcancelledフラグを設定できますが、生産者はブロックされたputメソッドから回復できないため、このフラグをチェックすることはできません(消費者はキューから素数を取り出すのを停止しているため、putメソッドはブロック状態を保存し続けます).
  上記の問題を解決するために、キャンセル操作の代わりにスレッド中断を行うことができ、具体的なコードは以下の通りである.
class BrokenPrimeProducer extends Thread{
		private final BlockingQueue<BigInteger> queue;
		
		BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
			this.queue = queue;
		}
		
		public void run(){
			try{
				BigInteger p = BigInteger.ONE;
				while(!Thread.currentThread().isInterrupted())
					queue.put(p = p.nextProbablePrime());
			}catch(InterruptedException consumed){
				
			}
		}
		
		public void cancel(){ interrupt();}
	}