Java同時実行(特定のインスタンス):効率的で伸縮性の高い結果キャッシュの構築

5426 ワード

この例は『Java同時プログラミング実戦』第5章から来ている.本稿では,最初に最も簡単なHashMapから構築し,その後その同時欠陥を解析して一歩一歩修復する効率的で伸縮性のあるキャッシュを開発する.
hashMapバージョン
まず,computableインタフェースを定義し,compute()メソッドを含み,時間のかかる数値計算メソッドである.Memoizer 1は、hashMapを使用して以前に計算した結果を保存する最初のバージョンのキャッシュです.computeメソッドでは、まず必要な結果がキャッシュにあるかどうかを確認し、存在する場合は前に計算した値を返します.そうしないと、再計算して結果をHashMapにキャッシュしてから戻ります.
interface Computable<A, V> {
	V compute(A arg) throws InterruptedException;// 
}

public class Memoizer1<A, V> implements Computable<A, V> {
	private final Map<A, V> cache = new HashMap<A, V>();
	private final Computable<A, V> c;

	public Memoizer1(Computable<A, V> c) {
		this.c = c;
	}

	public synchronized V compute(A arg) throws InterruptedException {
		V result = cache.get(arg);
		if (result == null) {
			result = c.compute(arg);
			cache.put(arg, result);
		}
		return result;
	}
}

HashMapはスレッドが安全ではないため、2つのスレッドが同時にHashMapにアクセスしないことを確保するには、Memoizer 1は、メソッド全体を同期する保守的な方法を採用しています.この方法はスレッドのセキュリティを確保することができるが、computeを実行できるスレッドが毎回1つしかないという明らかな伸縮性の問題をもたらす.
ConcurrentHashMapバージョン
HashMapの代わりにConcurrentHashMapを使用してMemoizer 1の悪いものを改善し、リリースすることができます.ConcurrentHashMapはスレッドが安全であるため、下位のMapにアクセスする際に同期する必要がないため、compute()メソッドの同期によるシリアル性は回避されます.
public class Memoizer2 <A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
    private final Computable<A, V> c;

    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

しかし、このバージョンのキャッシュには問題があります.スレッドAがコストのかかる計算を開始し、他のスレッドがこのスレッドが進行していることを知らない場合は、この計算を繰り返す可能性があります.
FutureTaskバージョン1
最終計算結果ではなくmapにFutureオブジェクトを格納できます.Futureオブジェクトはプレースホルダに相当し、結果が計算中であることをユーザーに伝えます.最終結果を得るにはget()メソッドを呼び出します.Futureのget()メソッドは、結果が計算中の場合、結果の計算が完了するまでブロックされ、戻ります.結果が計算済みであれば、そのまま戻ります.
public class Memoizer3<A, V> implements Computable<A, V> {
	private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
	private final Computable<A, V> c;

	public Memoizer3(Computable<A, V> c) {
		this.c = c;
	}

	public V compute(final A arg) throws InterruptedException {
		Future<V> f = cache.get(arg);
		if (f == null) {
			Callable<V> eval = new Callable<V>() {
				public V call() throws InterruptedException {
					return c.compute(arg);
				}
			};
			FutureTask<V> ft = new FutureTask<V>(eval);
			f = ft;
			cache.put(arg, ft);
			ft.run(); // call to c.compute happens here
		}
		try {
			return f.get();
		} catch (ExecutionException e) {
			cache.remove(arg);
		}
		return null;
	}
}

Memoizer 3は前のバージョンの問題を解決し、他のスレッドが計算結果を持っている場合、新しいスレッドはこの結果が計算されるのを待っていますが、彼のもう一つの欠陥は、2つのスレッドが同じ値を計算する脆弱性があることです.これは典型的な「先にチェックしてから実行する」競合条件エラーであり、mapに結果があるかどうかを確認し、存在しない場合は新しい値を計算します.これは原子操作ではないので、2つのスレッドが同じ時間にcomputeを呼び出して同じ値を計算する可能性があります.
FutureTaskバージョン2
Memoizer 3がこの問題を抱えているのは,複合操作「なければ追加」が原子性を持たないため,ConcurrentMapにおける原子法putIfAbsentを用いてMemoizer 3の脆弱性を回避できるためである.
public class Memoizer <A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache
            = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw LaunderThrowable.launderThrowable(e.getCause());
            }
        }
    }
}

キャッシュ・オブジェクトが値ではなくFutureの場合、計算がキャンセルされたり失敗したりする可能性があるため、キャッシュ汚染の問題が発生します.この場合、mapからオブジェクトを削除し、再計算する必要があります.このキャッシュシステムの使用は非常に簡単で、1つのComputableオブジェクトを入力するだけでキャッシュを構築することができ、計算結果を得るにはcompute()メソッドを呼び出すと、計算結果をキャッシュします.
まとめ
最初のバージョンに比べて、最終バージョンは性能が大幅に向上し、ConcurrentHashMapの使用は方法全体のロックを回避した.FutureTaskの使用により、計算が非同期化され、現在のスレッド計算が進行中であることを1つのFutureオブジェクトで伝えると、計算の重複が大幅に回避されます.