同時キャッシュ:『JCP』を読む

6452 ワード

キャッシュメソッドは、プログラミングでよく遭遇します.たとえば、複雑に計算された値ですが、計算されると変化しなくなり、キャッシュで保存できます.最も簡単な書き方は以下の通りです.

Object value = null;
if ( (value = cache.get(key)) == null ) {
   value = compteValue(key);
  }
cache.put(key, value);

マルチスレッドを考慮しなければ、これも問題ないように見えます.実際のアプリケーションでは、このように書くことはできません.
まず、キャッシュの出現は、cpuの消費を減らすため、つまり、compteValueという方法は時間がかかるはずです(簡単に取得すれば、キャッシュを使う必要もありません).次に、このキャッシュに複数のスレッドが同時にアクセスする可能性があります.つまり、頻繁に使用されるため、ここでは、並列に発行されたキャッシュについて説明します.
『JCP』で示した例は逐次漸進的で、とても良いです.concurrentパッケージのいくつかのクラスの使用も見ることができます.次はこれらの例をここに置いて、勉強に供します.

public class Memoier1<K, V> implements Computable<K, V> {
	private final Map<K, V> cache = new HashMap<K, V>();

	private final Computable<K, V> c;

	public Memoier1(Computable<K, V> c) {
		this.c = c;
	}

	@Override
	public synchronized V compute(K key) throws InterruptedException {

		V result = cache.get(key);
		if (result == null) {
			result = c.compute(key);
			cache.put(key, result);
		}
		return result;

	}
}

見覚えがあるでしょう.使いましたが
synchronizedですが、このような同期はより低いパフォーマンスをもたらすだけです.synchronizedはこの時方法全体をロックしたからです.つまり,各呼び出しスレッドはキュー呼び出ししかできないのではないか.性能改善のためにConcurrentHashMapを考えました

public class Memoier2<K, V> implements Computable<K, V> {
	private final Map<K, V> cache = new ConcurrentHashMap<K, V>();

	private final Computable<K, V> c;

	public Memoier1(Computable<K, V> c) {
		this.c = c;
	}

	@Override
	public V compute(K key) throws InterruptedException {

		V result = cache.get(key);
		if (result == null) {
			result = c.compute(key);
			cache.put(key, result);
		}
		return result;

	}
}

これで我々はConcurrentHashMapの特性を利用し,compute法を並列に行うことができるようになった.しかし、これは依然として完璧ではありません.computeの方法は時間がかかるかもしれません.すなわち,あるスレッドがcomputeメソッドを呼び出すと,別のスレッドも呼び出されるが,前のスレッドの状態を知らない.だから彼もcomputeメソッドを呼び出して、このようにして、キャッシュは意味がありません.私たちはcomputeを一度だけ望んでいます.問題は、スレッド間で共有できるステータスがないことです.次のスレッドは、このkey上で同じスレッドが実行されていることをどのように知っていますか?
これはFutureが役に立ちました.Futureはスレッド実行の結果をキャプチャできます.

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memoier3<K,V> implements Computable<K,V>{
	private final Map<K,Future<V>> cache = new ConcurrentHashMap<K,Future<V>>();
	private final Computable<K, V> c;
	
	public Memoier3(Computable<K,V> c){
		this.c = c;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public V compute(final K k) throws InterruptedException {
		// TODO Auto-generated method stub
		Future<V> f = cache.get(k);
		if(f == null){
			 FutureTask<V> ft = new FutureTask<V>(new Callable<V>(){

				@Override
				public V call() throws Exception {
					return c.compute(k); 
				}
				 
			 });
			 cache.put(k, ft);
			 f = ft;
			 ft.run();
		}	
		try{
			return f.get();
		}catch(ExecutionException e){
			throw lanuderThrowable(e.getCause());
		}
	}
	
	public static RuntimeException lanuderThrowable(Throwable t){
		if( t instanceof RuntimeException ){
			return (RuntimeException)t;
		}else if(t instanceof Error){
			throw new Error();
		}else
			throw new IllegalStateException("Not Unchecked", t);
		
	}

}

これでだいぶよくなったように見えます.しかし、著者はいくつかの問題を見つけた.まず、両方のスレッドがcompute関数にアクセスし、

if ( f == null ){
   ...
 put(k,v)
}

このような(check-then-act)スレッドは安全ではありません.次に、Futureが失敗すると、私たちが得るたびに間違った結果(cache pollution)、最後にcacheタイムアウト、新旧置換などの問題があります(著者はただ、どのように実現するかを自分で考えることができます).
次は「最終」コードです

package book.jcp.basic;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memoier<K, V> implements Computable<K, V> {
	private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
	private final Computable<K, V> c;

	public Memoier(Computable<K, V> c) {
		this.c = c;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public V compute(final K k) throws InterruptedException {
		while (true) {
			Future<V> f = cache.get(k);
			if (f == null) {
				FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {

					@Override
					public V call() throws Exception {
						// TODO Auto-generated method stub
						return c.compute(k);
					}

				});
				f = cache.putIfAbsent(k, ft);
				if (f == null) {
					f = ft;
					ft.run();
				}
			}
			try {
				return f.get();
			} catch (CancellationException e) {
				cache.remove(k);
			} catch (ExecutionException e) {
				throw lanuderThrowable(e.getCause());
			}
		}
	}

	public static RuntimeException lanuderThrowable(Throwable t) {
		if (t instanceof RuntimeException) {
			return (RuntimeException) t;
		} else if (t instanceof Error) {
			throw new Error();
		} else
			throw new IllegalStateException("Not Unchecked", t);

	}

}


最初の問題を解決するためにputIfAbsentを用い,この点が2回計算されないようにしたが,キャンセル(失敗)されたFutureでは異常をキャプチャした後にFutureを除去することで実現した.
最後に述べたように、キャッシュが期限切れになったら、みんなで方法を考えるしかありません.
リースパラメータを追加し、専用のスレッドを使用してキャッシュをスキャンするしかないと思います.