JavaはWeakReference、SoftReference、ConcurrentHashMapを使用してローカルキャッシュを構築し、高同時性とクラスタ環境をサポート

48511 ワード

前言(個人的な理解):キャッシュは一般的に消費時間、リソース消費の問題を緩和するために使用されますが、ローカルキャッシュ(jvmキャッシュ)は、ネットワーク接続を介してアクセスする必要があるキャッシュ(Redisなど)に比べて、ローカルキャッシュは主に消費時間の問題を緩和するために使用され、ローカルキャッシュが実現しやすく、リモートキャッシュがストレージをサポートするオブジェクトが不十分です.(シーケンス化/逆シーケンス化によって解決する必要がある場合、本質的には時間の問題です).
Referenceが参照するオブジェクトの生存時間について:
WeakReference:強い参照が現在存在しない場合、オブジェクトTの生存時間はGC内である.
SoftReference:現在、強い参照がオブジェクトTを指していない場合、オブジェクトTの生存時間は、システムがメモリオーバーフローを起こすまで消去されません(Full GCのラウンドの時間?)
そのため、どのタイプのキャッシュを使用するかは、自分のビジネスニーズを見てみましょう.
注意:以下はjava.lang.reflect.WeakCacheからの模倣を実現します.
まずベースクラスであり、データアクセスロジックがカプセル化されており、サブクラスは特定のタイプのReferenceを提供するだけでよい.WeakReference/SoftReferenceを直接採用するのとは異なり、この実装はkey-valueの形式(keyは強参照、valueは弱参照/ソフト参照)であるため、使用シーンはカテゴリに応じて複数の同一カテゴリをキャッシュする必要がある場合である.
import java.lang.ref.ReferenceQueue;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Cache mapping a {@code key -> value}. values are weakly(or softly) but keys are strongly referenced.
 * Keys are passed directly to {@link #get} method.Values are calculated from keys using the {@code valueFactory}
 * function passed to the constructor. Keys can not be {@code null} and are compared by equals 
 * while values returned by {@code valueFactory} can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the WeakReferences(or SoftReference)
 * to values are cleared. 
 * 

this class is imitate from java.lang.reflect.WeakCache * * @param type of keys * @param type of values */ public abstract class ReferenceCache { private static Log log ; private final ReferenceQueue refQueue = new ReferenceQueue<>(); private final ConcurrentMap> map = new ConcurrentHashMap<>(); private final Function valueFactory; /** * Construct an instance of {@code ReferenceCache} * * @param valueFactory a function mapping a {@code key -> value} * @throws NullPointerException if {@code valueFactory} is null. */ protected ReferenceCache(Function valueFactory){ this.valueFactory = Objects.requireNonNull(valueFactory); } private static Log getLog(){ // lazily init the log if(log==null){ // regardless of the concurrency log = LogFactory.getLog(ReferenceCache.class); } return log; } /** * Look-up the value through the cache. * * @param key * @return the cached value (maybe null) * @throws NullPointerException if {@code key} passed in is null. */ public final V get(K key){ Objects.requireNonNull(key); expungeStaleEntries(); Value cache = map.get(key); Value newCache = null; while(true){ if(cache!=null){ V value = cache.get(); if(value!=null){ return value; } } // lazily construct a new-CacheEntry if(newCache==null){ // create new value V value = valueFactory.apply(key); // if new-value is null then just return it if(value==null){ return null; } // wrap value with CacheValue (WeakReference or SoftReference) newCache = createNewValue(key, value, refQueue); } if(cache==null){ cache = map.putIfAbsent(key, newCache); if(cache==null){ // successfully put new-cache cache = newCache; } }else{ if(map.replace(key, cache, newCache)){ if(cache==newCache){ //newCache is cleared? getLog().error("should not reach here!---->there is a bug in ReferenceCache? newCache.value=" + newCache.get()+" -->"+newCache.getClass()); return valueFactory.apply(key); } // successfully replaced cleared CacheEntry with our new-CacheEntry cache = newCache; }else{ // retry with current cache-value cache = map.get(key); } } } } /** * expunge all stale cache entries */ private void expungeStaleEntries(){ Value cache; while ((cache = (Value)refQueue.poll()) != null){ // removing by key and mapped value is always safe here because after a Value // is cleared and enqueue-ed it is only equal to itself // (see Value.equals method)... map.remove(cache.getKey(), cache); } } /** * Removes all of the mappings from this cache. * The cache will be empty after this call returns. */ public final void clear(){ // to remove from refQueue while (refQueue.poll() != null) ; map.clear(); } /** * Removes the key (and its corresponding value) from this cache. * This method does nothing if the key is not in the cache. * * @param key the key that needs to be removed * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * ( or already cleared) * @throws NullPointerException if the specified key is null */ public final V remove(Object key){ expungeStaleEntries();// to remove from refQueue Value val = map.remove(key); if(val!=null){ return val.get(); } return null; } /** * Constructs a new {@code Value} (WeakReference or SoftReference) * with specified key, value and ReferenceQueue. * @param key the key * @param value the value * @param refQueue the ReferenceQueue * @return a new Value */ protected abstract Value newValue(K key, V value, ReferenceQueue refQueue); /** * Common type of value suppliers that are holding a referent. * The {@link #equals} and {@link #hashCode} of implementations is defined * to compare the referent by identity and cleared Value is only equal to itself. * * @param type of keys * @param type of values */ protected static interface Value extends Supplier { /** * Gets the key. * * @return key */ K getKey(); } }


ソフトリファレンスSoftReferenceの実装:
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Objects;
import java.util.function.Function;

/**
 * Cache mapping a {@code key -> value}. values are softly but keys are strongly referenced.
 * Keys are passed directly to {@link #get} method.Values are calculated from keys using the {@code valueFactory}
 * function passed to the constructor. Keys can not be {@code null} and are compared by equals 
 * while values returned by {@code valueFactory} can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the SoftReference to
 * values are cleared. 
 *
 * @param  type of keys
 * @param  type of values
 */
public final class SoftCache extends ReferenceCache{

	/**
     * Construct an instance of {@code SoftCache}
     *
     * @param valueFactory  a function mapping a {@code key -> value}
     * @throws NullPointerException if {@code valueFactory} is null.
     */
	public SoftCache(Function valueFactory) {
		super(valueFactory);
	}

	/**
	 * create a new instance of Value(SoftReference)
	 */
	@Override
	protected Value newValue(
			K key, V value, ReferenceQueue refQueue) {
		return new CacheValue(key, value, refQueue);
	}

	/**
     * CacheValue containing a softly referenced {@code value}. It registers
     * itself with the {@code refQueue} so that it can be used to expunge
     * the entry when the {@link SoftReference} is cleared.
     */
    private static final class CacheValue extends SoftReference implements Value {

        private final int hash;
        private final K key;

        private CacheValue(K key, V value, ReferenceQueue refQueue) {
            super(value, refQueue);
            this.hash = System.identityHashCode(value);  // compare by identity
            this.key = Objects.requireNonNull(key);
        }

        @Override
        public int hashCode() {
            return hash;
        }

        @Override
        public boolean equals(Object obj) {
            V value;
            return obj == this ||
                   obj != null &&
                   obj.getClass() == this.getClass() &&
                   // cleared CacheValue is only equal to itself
                   (value = this.get()) != null &&
                   // compare value by identity
                   value == ((CacheValue) obj).get();
        }

		public K getKey() {
			return key;
		}
        
    }
}

弱引用WeakReferenceの実装:
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Objects;
import java.util.function.Function;


/**
 * Cache mapping a {@code key -> value}. values are weakly but keys are strongly referenced.
 * Keys are passed directly to {@link #get} method.Values are calculated from keys using the {@code valueFactory}
 * function passed to the constructor. Keys can not be {@code null} and are compared by equals 
 * while values returned by {@code valueFactory} can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the WeakReferences to
 * values are cleared. 
 *
 * @param  type of keys
 * @param  type of values
 */
public final class WeakCache extends ReferenceCache{

	/**
     * Construct an instance of {@code WeakCache}
     *
     * @param valueFactory  a function mapping a {@code key -> value}
     * @throws NullPointerException if {@code valueFactory} is null.
     */
	public WeakCache(Function valueFactory){
		super(valueFactory);
	}
	
	/**
	 * create a new instance of Value(WeakReference)
	 */
	@Override
	protected Value newValue(
			K key, V value, ReferenceQueue refQueue) {
		return new CacheValue(key, value, refQueue);
	}
	
	/**
     * CacheValue containing a weakly referenced {@code value}. It registers
     * itself with the {@code refQueue} so that it can be used to expunge
     * the entry when the {@link WeakReference} is cleared.
     */
    private static final class CacheValue extends WeakReference implements Value{

        private final int hash;
        private final K key;

        private CacheValue(K key, V value, ReferenceQueue refQueue) {
            super(value, refQueue);
            this.hash = System.identityHashCode(value);  // compare by identity
            this.key = Objects.requireNonNull(key);
        }

        @Override
        public int hashCode() {
            return hash;
        }

        @Override
        public boolean equals(Object obj) {
            V value;
            return obj == this ||
                   obj != null &&
                   obj.getClass() == this.getClass() &&
                   // cleared CacheValue is only equal to itself
                   (value = this.get()) != null &&
                   // compare value by identity
                   value == ((CacheValue) obj).get();
        }

		public K getKey() {
			return key;
		}
        
    }

}

1つの使用例:(例えば、ユーザidに基づいて対応する情報を検索し、現在のキャッシュに対応する情報が存在しない場合はデータベースから取得してキャッシュに入れる.キャッシュのオブジェクトは通常、不変または頻繁に変更されない情報であるべきである)
public class Test {

	//Object          Bean  
	private static final SoftCache cache = 
			new SoftCache<>(id -> Test.getFromDB(id));
	
	/**
	 *               
	 * @param id   
	 * @return      
	 */
	private static Object getFromDB(Long id){
		//           ,           
		//TODO            null      
		return null;
	}
	
	/**
	 * 
	 * @param id   id
	 * @return       
	 * @throws NullPointerException if specified id is null
	 */
	//       ,       ,                    
	public static Object get(Long id){
		//         null       valueFactory(   Test.getFromDB(id))    (    null          return    )
		return cache.get(id);
	}
}

同時性が高い場合、valueFactory.apply(key)が複数回呼び出される可能性があります.valueFactory.apply(key)値の計算プロセス(現在の例ではgetFromDB(id)の呼び出しプロセス)消費時間、消費リソースを比較する場合は、現在計算/呼び出しを実行しているスレッドが1つしかないことを制限する必要があります.他のスレッドは計算/呼び出し結果を待っています.コードは次のとおりです.
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Test {

	private static final ConcurrentMap>
	LOCK = new ConcurrentHashMap<>();
	
	//Object          Bean  
	private static final SoftCache cache = 
			new SoftCache<>(id -> Test.getFromDB(id));
	
	/**
	 *               
	 * @param id   
	 * @return      
	 */
	// 《Java      》5.6 -             
	private static Object getFromDB(Long id){
		Future f = LOCK.get(id);
		if(f==null){
			Callable eval = new GetFromDBCallable(id);
			FutureTask ft = new FutureTask(eval);
			f = LOCK.putIfAbsent(id, ft);
			if(f==null){//successfully put
				f = ft;
				ft.run();
			}
		}
		try{
			return f.get();
		} catch (InterruptedException | ExecutionException e) {
			//              throw RuntimeException or return null
			e.printStackTrace();
			//TODO            null
			return null;
		} finally{
			//                   
			LOCK.remove(id, f);
		}
	}
	
	private static class GetFromDBCallable implements Callable{

		private final long id;
		public GetFromDBCallable(long id) {
			this.id = id;
		}
		
		@Override
		public Object call() throws Exception {
			return GetFromDBCallable.doGetFromDB(id);
		}
		
		private static Object doGetFromDB(long id){
			//           ,           
			//TODO            null      
			return null;
		}
	}
	
	/**
	 * 
	 * @param id   id
	 * @return       
	 * @throws NullPointerException if specified id is null
	 */
	//       ,       ,                    
	public static Object get(Long id){
		//         null       valueFactory(   Test.getFromDB(id))    (    null          return    )
		return cache.get(id);
	}
}

ReferenceCache.mapは最終的な値(valueFactory計算後の値)を格納しているため、ReferenceCacheはputIfAbsent()を事前に占有することで、同時環境下でvalueFactory.apply(key)を複数回呼び出す問題を回避することはできないため、外部でもう1つの層(上記の例を参照)をカバーして解決するしかない.
もう1つの問題は、キャッシュの情報が変化した場合、対応するキャッシュを更新する必要があることです.単一ノード環境では問題ありません.cache.remove(id)メソッドを直接カプセル化して呼び出します.クラスタ(マルチノード)の場合環境では、他のノードがデータ・キャッシュを更新するかどうかを考慮する必要があります.1つの方法は、バージョン番号をポーリングすることです.キャッシュに中央バージョン番号を提供し、更新するたびに中央バージョン番号+1を提供し、呼び出し先で呼び出すたびにローカルバージョン番号を中央バージョン番号と比較し、一致しない場合はローカル・キャッシュを更新し、ローカルバージョン番号が中央バージョン番号の値を更新します.ポイントは、毎回中央バージョン番号を取得することです.もう1つの方法は、Redisのサブスクリプション/パブリッシュ機能のようなメッセージ通知のモードを使用することです.コードは次のとおりです.
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.function.Consumer;

import com.test.redis.RedisSubscribeThreadManager;
import com.test.redis.ReidsMessageConsumerManager;
import com.test.redis.SubscribeChannel;

public class Test {

	static{
		//     Redis    
		ReidsMessageConsumerManager.register(SubscribeChannel.CACHE_UPDATE,
				new Consumer(){
					/**
					 *            .                id
					 */
					@Override
					public void accept(String message) {
						List ids = getLongsBySplitComma(message);
						for(Long id:ids){
							Test.remove(id);
						}
					}

				});
	}
	
	private static final ConcurrentMap>
	LOCK = new ConcurrentHashMap<>();
	
	//Object          Bean  
	private static final SoftCache cache = 
			new SoftCache<>(id -> Test.getFromDB(id));
	
	/**
	 *               
	 * @param id   
	 * @return      
	 */
	// 《Java      》5.6 -             
	private static Object getFromDB(Long id){
		Future f = LOCK.get(id);
		if(f==null){
			Callable eval = new GetFromDBCallable(id);
			FutureTask ft = new FutureTask(eval);
			f = LOCK.putIfAbsent(id, ft);
			if(f==null){//successfully put
				f = ft;
				ft.run();
			}
		}
		try{
			return f.get();
		} catch (InterruptedException | ExecutionException e) {
			//              throw RuntimeException or return null
			e.printStackTrace();
			//TODO            null
			return null;
		} finally{
			//                   
			LOCK.remove(id, f);
		}
	}
	
	private static class GetFromDBCallable implements Callable{

		private final long id;
		public GetFromDBCallable(long id) {
			this.id = id;
		}
		
		@Override
		public Object call() throws Exception {
			return GetFromDBCallable.doGetFromDB(id);
		}
		
		private static Object doGetFromDB(long id){
			//           ,           
			//TODO            null      
			return null;
		}
	}
	
	/**
	 *       long     List
	 * @param strDigits
	 * @return
	 * @throws RuntimeException             long  
	 */
	private static List getLongsBySplitComma(String strDigits){
		if(strDigits==null||strDigits.isEmpty())
			return Collections.emptyList();
		
		String[] digis = strDigits.split(",");
		List list = new ArrayList<>(digis.length);
		for(String digi:digis){
			try {
				list.add(Long.valueOf(digi));
			} catch (Exception e) {
				throw new RuntimeException("["+digi+"]     long  ");
			}
		}
		return list;
	}
	
	/**
	 * 
	 * @param id   id
	 * @return       
	 * @throws NullPointerException if specified id is null
	 */
	//       ,       ,                    
	public static Object get(Long id){
		//           .           ,         
		RedisSubscribeThreadManager.ensureSubscribeThreadAlive();
		//         null       valueFactory(   Test.getFromDB(id))    (    null          return    )
		return cache.get(id);
	}
	
	/**
	 *         
	 * @param id        id
	 */
	public static void remove(Long id){
		cache.remove(id);
	}
}

redisサブスクリプションはサブスクリプションをキャンセルするまでブロックされるため、複数のビジネスがあり、各ビジネスが1つのスレッドを起動してサブスクリプションを行うと、少し無駄になる可能性があります(もちろん、特定のビジネスを見なければなりません).ここでは、ReidsMessageConsumermanager.register(channel,consumer)を呼び出すことで、1つのサブスクリプションスレッドを共有できる複数のサブスクリプションビジネスがカプセル化されています.サブスクリプションメッセージを登録する消費者は、サブスクリプションスレッドのオンを自動的にトリガーします(スレッドが予期せぬ終了時に自動的に一定回数の再試行を行うことができます)..このような共通のサブスクリプションスレッドは、各ビジネスに対してそれぞれ1つのサブスクリプションスレッドを起動する場合に比べて、あるメッセージの消費者が長い間処理している場合、他の処理対象のメッセージが長く待つ可能性があるという問題がある可能性があります.そのため、どのポリシーを使用するかは、ビジネスニーズに依存します.
ReidsMessageConsumerManager.java:
package com.test.redis;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
 * 
 *    .
* ( {@link #register} ), * {@link #get(String) get} . *
: {@link SubscribeChannel} * ( {@link SubscribeChannel#getChannels()} ) * */ public class ReidsMessageConsumerManager { private static final Map> channelConsumers = new ConcurrentHashMap<>(); static{ String[] channels = SubscribeChannel.getChannels(); if(channels.length>0){ SubscribeManager.subscribe(new SubscribeRunnable(new ReidsMessageListener(), channels)); } } /** * get * @param channel * @param consumer * @throws NullPointerException if any arg is null */ public static void register(String channel,Consumer consumer){ channelConsumers.put(channel, consumer); } /** * ( SubscribeChannel ) * @param channel * @return true ,false * @throws NullPointerException if specified channel is null */ public static boolean unregister(String channel){ if(SubscribeChannel.isDefined(channel)){ return false; } return channelConsumers.remove(channel)!=null; } /** * . null * @param channel * @return Consumer , null * @throws NullPointerException if the specified channel is null */ public static Consumer get(String channel){ return channelConsumers.get(channel); } }

SubscribeRunnable.java:
package com.test.redis;



import java.util.Objects;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisException;


public class SubscribeRunnable implements ResubscribedRunnable{
	
	private static final Log log = LogFactory.getLog(SubscribeRunnable.class);

	private final JedisPubSub jedisPubSub;
	
	private final String[] channels;
	
	private boolean needToResubscribe = true;
	
	/**
	 *     SubscribeRunnable  
	 * @param jedisPubSub      
	 * @param channels          
	 * @throws NullPointerException if any arg is null
	 * @throws IllegalArgumentException if channels is empty
	 */
	public SubscribeRunnable(JedisPubSub jedisPubSub, String... channels) {
		if(channels.length==0){
			throw new IllegalArgumentException("empty channels!");
		}
		this.jedisPubSub = Objects.requireNonNull(jedisPubSub);
		this.channels = channels;
	}
	
	@Override
	public void run() {
		Jedis jedis = null;
		try{
			jedis = RedisPool.get();
			//subscribe()             (     )
			jedis.subscribe(jedisPubSub, channels);
			this.needToResubscribe = jedisPubSub.isSubscribed();
		} catch(JedisException e){
			log.error("redis          :"+e.toString(), e);
		} finally{
			RedisPool.close(jedis);
		}
	}

	/**
	 * {@inheritDoc}
	 * 

Default implementation does nothing. */ public void failToResubscribe(Throwable t){} /** * Returns true if should continue to resubscribe when abort unexpectedly * @return {@code true} if should continue to resubscribe, * else {@code false} */ public boolean isNeedToResubscribe() { return needToResubscribe; } }


ResubscribedRunnable.java:
package com.test.redis;

/**
 * 
 * A class that supports auto resubscribe when abort unexpectedly
 *
 */
public interface ResubscribedRunnable extends Runnable{

	/**
	 * Method invoked when fails to retry to subscribe before quit.
	 * 
	 * @param t the exception that caused termination
	 */
	void failToResubscribe(Throwable t);

	/**
	 * Returns true if should continue to resubscribe when abort unexpectedly
	 * @return {@code true} if should continue to resubscribe,
     *         else {@code false}
	 */
	boolean isNeedToResubscribe();
}

SubscribeChannel.java:
package com.test.redis;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 * 
 * Redis  /         .
 * 
( {@link ReidsMessageConsumerManager ReidsMessageConsumerManager} * ), Jedis.subscribe() */ // ( , ) public final class SubscribeChannel { /** **/ public static final String CACHE_UPDATE = "CACHE_UPDATE"; /** * * @param channel * @return true ,false */ public static boolean isDefined(String channel){ return channels.contains(channel); } /** * * @return */ public static String[] getChannels(){ return channels.toArray(new String[channels.size()]); } /** * */ private static final Set channels; static{ Set fieldVal = new HashSet<>(); Field[] pubFields = SubscribeChannel.class.getFields(); Field[] decFields = SubscribeChannel.class.getDeclaredFields(); outer: for(Field pub:pubFields){ int mod = pub.getModifiers(); //public static final String if(Modifier.isStatic(mod)&&Modifier.isFinal(mod)&&pub.getType()==String.class){ for(Field dec:decFields){ if(pub.equals(dec)){ Object val = null; try { val = pub.get(SubscribeChannel.class); } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } if(val instanceof String){ fieldVal.add((String)val); } continue outer; } } } } channels = Collections.unmodifiableSet(fieldVal); } }

ReidsMessageListener.java:
package com.test.redis;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import redis.clients.jedis.JedisPubSub;

/**
 * 
 * Redis  /  -     
 *
 */
public class ReidsMessageListener extends JedisPubSub{

	
	/**
	 *        
	 */
	@Override
	public void onMessage(String channel, String message) {
		Consumer consumer = ReidsMessageConsumerManager.get(channel);
		if(consumer!=null){
			consumer.accept(message);
		}
	}

	/**
	 *        
	 */
	@Override
	public void onPMessage(String pattern, String channel, String message) {
		onMessage(channel, message);
	}

	/**
	 *            
	 */
	@Override
	public void onSubscribe(String channel, int subscribedChannels) {
		// ignore
		
	}

	/**
	 *             
	 */
	@Override
	public void onUnsubscribe(String channel, int subscribedChannels) {
		ReidsMessageConsumerManager.unregister(channel);
	}

	@Override
	public void onPUnsubscribe(String pattern, int subscribedChannels) {
		// ignore
	}

	@Override
	public void onPSubscribe(String pattern, int subscribedChannels) {
		// ignore
	}

}

RedisMessagePublisher.java:
package com.test.redis;

import java.util.Objects;

import redis.clients.jedis.Jedis;

/**
 * 
 * Redis       
 *
 */
public class RedisMessagePublisher {

	/**
	 *          
	 * @param channel          
	 * @param message        
	 * @throws NullPointerException if any arg is null
	 * @throws IllegalArgumentException 
	 * if the channel haven't been defined in the class {@link com.test.redis.SubscribeChannel SubscribeChannel}
	 */
	public static void publish(String channel, String message){
		Objects.requireNonNull(channel);
		Objects.requireNonNull(message);
		Jedis jedis = null;
		try {
			jedis = RedisPool.get();
			jedis.publish(channel, message);
		} finally{
			RedisPool.close(jedis);
		}
	}
}

RedisPool.java:
package com.test.redis;

import redis.clients.jedis.Jedis;

/**
 * 
 *      Redis      
 *
 */
public final class RedisPool {

	/**
	 *          Jedis  
	 * @return Jedis
	 */
	public static Jedis get(){
		//TODO          null
		return null;
	}
	
	/**
	 *     Jedis        
	 * @param jedis
	 */
	public static void close(Jedis jedis){
		//TODO     
	}
}

SubscribeManager.java:
package com.test.redis;

import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * 
 * A class that manages the subscribe of redis for resubscribe automatically
 * as need when abort unexpectedly
 *
 */
public class SubscribeManager {
	
	private SubscribeManager(){}
	
	private static final ThreadPoolExecutor EXECUTOR;

	private static final int MAX_SUBSCRIBE_SIZE = 20; //Adjust the size if needed
	
	static{
		ThreadPoolExecutor executor = new ThreadPoolExecutor(
				MAX_SUBSCRIBE_SIZE, MAX_SUBSCRIBE_SIZE, 1L, TimeUnit.NANOSECONDS,
	            new LinkedBlockingQueue(),ThreadFactorys.newFactory("RedisSubscribe")){
			@Override
			public void afterExecute(Runnable r, Throwable t){
				DelegateResubscribedRunnable delegate;
				if( r instanceof DelegateResubscribedRunnable){
					delegate = (DelegateResubscribedRunnable) r;
				}else{
					ResubscribedRunnable sr = (ResubscribedRunnable) r;
					if(!sr.isNeedToResubscribe()){
						sr.failToResubscribe(t);
						return;
					}
					delegate = new DelegateResubscribedRunnable(sr);
				}
				
				boolean needEnqueue = delegate.trySleepAndReturnIsNeedEnqueue();
				if(!needEnqueue||!getQueue().offer(delegate)){
					delegate.failToResubscribe(t);
				}
			}
			
			@Override
			public int prestartAllCoreThreads() {
				// Not allowed
				return 0;
			}
		};
		executor.allowCoreThreadTimeOut(true);
		EXECUTOR = executor;
	}
	
	/**
	 * Stars a thread to do the subscribe using the specified ResubscribedRunnable
	 * and it will resubscribed automatically as need when abort unexpectedly
	 * @param r the {@code ResubscribedRunnable}
	 * @throws NullPointerException if the specified ResubscribedRunnable is null
	 */
	public static void subscribe(ResubscribedRunnable r){
		EXECUTOR.execute(Objects.requireNonNull(r));
	}
	
	/**
	 * Returns the current number of {@code ResubscribedRunnable} in the pool.
	 * @return the number of {@code ResubscribedRunnable}
	 */
	public static int getCurrentSubscribedSize(){
		return EXECUTOR.getActiveCount();
	}
	
	/**
	 * Returns the maximum allowed number of {@code ResubscribedRunnable}.
	 * @return the maximum allowed number of {@code ResubscribedRunnable}.
	 */
	public static int getMaximumSubscribeSize(){
		return MAX_SUBSCRIBE_SIZE;
	}
	
	private static class DelegateResubscribedRunnable implements ResubscribedRunnable{
		private static final long TIME_LIMIT = 1 * 60 * 1000;
		private static final int MAX_RETRY_COUNT = 10;
		private final ResubscribedRunnable delegate;
		private long lastTime;
		private int continuousRetryCount;
		public DelegateResubscribedRunnable(ResubscribedRunnable r) {
			delegate = r;
		}

		@Override
		public void run() {
			delegate.run();
		}
		
		public boolean trySleepAndReturnIsNeedEnqueue(){
			if(!isNeedToResubscribe()){
				return false;
			}
			if(lastTime>0&&System.currentTimeMillis()0){
					if(continuousRetryCount>MAX_RETRY_COUNT){
						return false;
					}
					sleep(30*continuousRetryCount);
				}
			}else{
				continuousRetryCount = 1;
			}
			lastTime = System.currentTimeMillis();
			return true;
		}
		
		private void sleep(int seconds){
			if(seconds<=0){
				return;
			}
			long remaining = seconds*1000;
			final long last = System.currentTimeMillis()+remaining;
			for(;;){
				try {
					if(remaining<=0){
						return;
					}
					Thread.sleep(remaining);
				} catch (InterruptedException e) {
					//ignored
				}
				remaining = last-System.currentTimeMillis();
			}
			
		}

		@Override
		public void failToResubscribe(Throwable t) {
			delegate.failToResubscribe(t);
			
		}

		@Override
		public boolean isNeedToResubscribe() {
			return delegate.isNeedToResubscribe();
		}

		
	}
}

ThreadFactorys.java
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 
 * A utility class to offer an implementation of {@code ThreadFactory}
 * conveniently which is the same as 
 * {@code java.util.concurrent.Executors$DefaultThreadFactory}
 * but will offer a more specific thread name by the specified
 * parameter {@code threadNamePrefix}
 *
 */
public final class ThreadFactorys {

	private ThreadFactorys(){}
	
	/**
	 * Creates a new {@code ThreadFactory} with specified thread name prefix.
	 * The thread name will be "{@code threadNamePrefix-PoolThread-n}",
	 * where {@code threadNamePrefix} is the parameter passed in, and the 
	 * {@code n} is the number of threads that have been created via this ThreadFactory.
	 * @param threadNamePrefix the thread name prefix
	 * @return a new {@code ThreadFactory}
	 */
	public static ThreadFactory newFactory(String threadNamePrefix){
		//Copy from java.util.concurrent.Executors$DefaultThreadFactory
		//to offer a more specific thread name
		return new ThreadFactory(){

			private final AtomicInteger threadNumber = new AtomicInteger(1);
			private final String namePrefix = threadNamePrefix+"-PoolThread-";
			
			@Override
			public Thread newThread(Runnable r) {
	            Thread t = new Thread(r, 
	            		namePrefix + threadNumber.getAndIncrement());
	            if (t.isDaemon())
	                t.setDaemon(false);
	            if (t.getPriority() != Thread.NORM_PRIORITY)
	                t.setPriority(Thread.NORM_PRIORITY);
	            return t;
			}
			
		};
	}
}

呼び出しによる
RedisMessagePublisher.publish(SubscribeChannel.CACHE_UPDATE, "              ");

クラスタに更新キャッシュの通知を送信することができ、対応するRedisサブスクリプションイベントを登録して更新通知を受信した場合、対応するキャッシュを更新することができる.メッセージ通知の信頼性は、redisのサブスクリプション/パブリケーションメカニズムに依存し、redisパブリケーションは、サブスクリプションのクライアント側が受信メッセージを受信したかどうかにかかわらず(例えば、接続が切断された)一度だけパブリッシュされることが分かった.を選択すると、対応するノードのキャッシュが更新されない可能性があります.redisのサブスクリプション/パブリケーションは100%正確であることは保証されませんが、その信頼性は可能であるはずです.確実に強い信頼性が必要な場合は、他のメッセージ・メカニズムを考慮してください.
2つのパラメータから対応するkeyまたはvalue(BiFunction)を生成するシーンもあります.
BiReferenceCache.java:
import java.lang.ref.ReferenceQueue;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Cache mapping a {@code key -> value}. values are weakly(or softly) but keys are strongly referenced.
 * Keys are calculated from keys {@code key1}, {@code key2} passed to corresponding methods using the
 * {@code keyFactory} function passed to the constructor. Values are calculated from keys {@code key1} or {@code key2} 
 * passed to corresponding methods using the {@code valueFactory} function passed to the constructor. 
 * Keys {@code key1}, {@code key2} and its result {@code key} produced by the {@code keyFactory}
 * can not be {@code null} and are compared by equals while values returned by {@code valueFactory} 
 * can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the WeakReferences(or SoftReferences)
 * to values are cleared. 
 * 
 * 

This is the two-arity specialization of {@link ReferenceCache}. * * @param the type of the first key * @param the type of the second key * @param the type of the result of the key's BiFunction * @param the type of the result of the value's BiFunction */ public abstract class BiReferenceCache { private static Log log ; private final ReferenceQueue refQueue = new ReferenceQueue<>(); private final ConcurrentMap> map = new ConcurrentHashMap<>(); private final BiFunction valueFactory; private final BiFunction keyFactory; /** * Construct an instance of {@code BiReferenceCache} * * @param keyFactory a BiFunction mapping a {@code key1,key2 -> key} * @param valueFactory a BiFunction mapping a * {@code key(produced by key's BiFunction with key1, key2) -> value} * @throws NullPointerException if {@code keyFactory} * or {@code valueFactory} is null. */ protected BiReferenceCache(BiFunction keyFactory, BiFunction valueFactory){ this.keyFactory = Objects.requireNonNull(keyFactory); this.valueFactory = Objects.requireNonNull(valueFactory); } private static Log getLog(){ // lazily init the log if(log==null){ // regardless of the concurrency log = LogFactory.getLog(BiReferenceCache.class); } return log; } /** * Look-up the value through the cache with the key * produced by key's BiFunction with given arguments * {@code key1}, {@code key2}. * @param key1 the first key * @param key2 the second key * @return the cached value (maybe null) * @throws NullPointerException if the specified key * {@code key1} or {@code key2} is null. */ public final VR get(T key1, U key2){ if(key1==null||key2==null){ throw new NullPointerException(); } expungeStaleEntries(); KR key = keyFactory.apply(key1, key2); if(key==null){ return null; } Value cache = map.get(key); Value newCache = null; while(true){ if(cache!=null){ VR value = cache.get(); if(value!=null){ return value; } } // lazily construct a new-CacheEntry if(newCache==null){ // create new value VR value = valueFactory.apply(key1, key2); // if new-value is null then just return it if(value==null){ return null; } // wrap value with CacheValue (WeakReference or SoftReference) newCache = newValue(key, value, refQueue); } if(cache==null){ cache = map.putIfAbsent(key, newCache); if(cache==null){ // successfully put new-cache cache = newCache; } }else{ if(map.replace(key, cache, newCache)){ if(cache==newCache){ //newCache is cleared? getLog().error("should not reach here!---->there is a bug in ReferenceCache? newCache.value=" + newCache.get()+" -->"+newCache.getClass()); return valueFactory.apply(key1, key2); } // successfully replaced cleared CacheEntry with our new-CacheEntry cache = newCache; }else{ // retry with current cache-value cache = map.get(key); } } } } /** * expunge all stale cache entries */ private void expungeStaleEntries(){ Value cache; while ((cache = (Value)refQueue.poll()) != null){ // removing by key and mapped value is always safe here because after a Value // is cleared and enqueue-ed it is only equal to itself // (see Value.equals method)... map.remove(cache.getKey(), cache); } } /** * Removes all of the mappings from this cache. * The cache will be empty after this call returns. */ public final void clear(){ // to remove from refQueue while (refQueue.poll() != null) ; map.clear(); } /** * Removes the key (and its corresponding value) from this cache. * This method does nothing if the key is not in the cache. * * @param key the key that needs to be removed * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * ( or already cleared) * @throws NullPointerException if the specified key is null */ public final VR remove(Object key){ expungeStaleEntries();// to remove from refQueue Value val = map.remove(key); if(val!=null){ return val.get(); } return null; } /** * Removes the key (which is produced by key's BiFunction * with given arguments {@code key1}, {@code key2}) * and its corresponding value from this cache. * This method does nothing if the key is not in the cache. * * @param key1 the first key * @param key2 the second key * @return the previous value associated with {@code key} * (which is produced by key's BiFunction with given arguments * {@code key1}, {@code key2}), or {@code null} if there was no mapping * for {@code key} ( or already cleared ) * @throws NullPointerException if the specified key * {@code key1} or {@code key2} is null */ public VR remove(T key1, U key2){ KR key = getKey(key1, key2); if(key==null){ return null; } return remove(key); } /** * Applies key's BiFunction to the given arguments * and then returns the function result. * @param key1 the first key * @param key2 the second key * @return the function result * @throws NullPointerException if the specified key * {@code key1} or {@code key2} is null */ public KR getKey(T key1, U key2){ if(key1==null||key2==null){ throw new NullPointerException(); } return keyFactory.apply(key1, key2); } /** * Constructs a new {@code Value} (WeakReference or SoftReference) * with specified key, value and ReferenceQueue. * @param key the key * @param value the value * @param refQueue the ReferenceQueue * @return a new Value */ protected abstract Value newValue(KR key, VR value, ReferenceQueue refQueue); /** * Common type of value suppliers that are holding a referent. * The {@link #equals} and {@link #hashCode} of implementations is defined * to compare the referent by identity and cleared Value is only equal to itself. * * @param type of keys * @param type of values */ protected static interface Value extends Supplier { /** * Gets the key. * * @return key */ KR getKey(); } }


BiSoftCache.java:
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Objects;
import java.util.function.BiFunction;

/**
 * Cache mapping a {@code key -> value}. values are softly but keys are strongly referenced.
 * Keys are calculated from keys {@code key1}, {@code key2} passed to corresponding methods using the
 * {@code keyFactory} function passed to the constructor. Values are calculated from keys {@code key1} or {@code key2} 
 * passed to corresponding methods using the {@code valueFactory} function passed to the constructor. 
 * Keys {@code key1}, {@code key2} and its result {@code key} produced by the {@code keyFactory}
 * can not be {@code null} and are compared by equals while values returned by {@code valueFactory} 
 * can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the SoftReferences
 * to values are cleared. 
 * 
 * 

This is the two-arity specialization of {@link SoftCache}. * * @param the type of the first key * @param the type of the second key * @param the type of the result of the key's BiFunction * @param the type of the result of the value's BiFunction */ public class BiSoftCache extends BiReferenceCache { /** * Construct an instance of {@code BiSoftCache} * * @param keyFactory a BiFunction mapping a {@code key1,key2 -> key} * @param valueFactory a BiFunction mapping a * {@code key(produced by key's BiFunction with key1, key2) -> value} * @throws NullPointerException if {@code keyFactory} * or {@code valueFactory} is null. */ public BiSoftCache(BiFunction keyFactory, BiFunction valueFactory) { super(keyFactory, valueFactory); } /** * Constructs a new {@code Value} using SoftReference * with the given arguments */ @Override protected Value newValue( KR key, VR value, ReferenceQueue refQueue) { return new CacheValue(key, value, refQueue); } /** * CacheValue containing a softly referenced {@code value}. It registers * itself with the {@code refQueue} so that it can be used to expunge * the entry when the {@link BiSoftCache} is cleared. */ private static final class CacheValue extends SoftReference implements Value { private final int hash; private final KR key; private CacheValue(KR key, VR value, ReferenceQueue refQueue) { super(value, refQueue); this.hash = System.identityHashCode(value); // compare by identity this.key = Objects.requireNonNull(key); } @Override public int hashCode() { return hash; } @Override public boolean equals(Object obj) { VR value; return obj == this || obj != null && obj.getClass() == this.getClass() && // cleared CacheValue is only equal to itself (value = this.get()) != null && // compare value by identity value == ((CacheValue) obj).get(); } public KR getKey() { return key; } } }


BiWeakCache.java:
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Objects;
import java.util.function.BiFunction;

/**
 * Cache mapping a {@code key -> value}. values are weakly but keys are strongly referenced.
 * Keys are calculated from keys {@code key1}, {@code key2} passed to corresponding methods using the
 * {@code keyFactory} function passed to the constructor. Values are calculated from keys {@code key1} or {@code key2} 
 * passed to corresponding methods using the {@code valueFactory} function passed to the constructor. 
 * Keys {@code key1}, {@code key2} and its result {@code key} produced by the {@code keyFactory}
 * can not be {@code null} and are compared by equals while values returned by {@code valueFactory} 
 * can be null and are compared by identity. 
 * Entries are expunged from cache lazily on invocation to {@link #get} method when the WeakReferences
 * to values are cleared. 
 * 
 * 

This is the two-arity specialization of {@link WeakCache}. * * @param the type of the first key * @param the type of the second key * @param the type of the result of the key's BiFunction * @param the type of the result of the value's BiFunction */ public class BiWeakCache extends BiReferenceCache{ /** * Construct an instance of {@code BiWeakCache} * * @param keyFactory a BiFunction mapping a {@code key1,key2 -> key} * @param valueFactory a BiFunction mapping a * {@code key(produced by key's BiFunction with key1, key2) -> value} * @throws NullPointerException if {@code keyFactory} * or {@code valueFactory} is null. */ public BiWeakCache(BiFunction keyFactory, BiFunction valueFactory){ super(keyFactory, valueFactory); } /** * Constructs a new {@code Value} using WeakReference * with the given arguments */ @Override protected Value newValue( KR key, VR value, ReferenceQueue refQueue) { return new CacheValue(key, value, refQueue); } /** * CacheValue containing a weakly referenced {@code value}. It registers * itself with the {@code refQueue} so that it can be used to expunge * the entry when the {@link WeakReference} is cleared. */ private static final class CacheValue extends WeakReference implements Value{ private final int hash; private final KR key; private CacheValue(KR key, VR value, ReferenceQueue refQueue) { super(value, refQueue); this.hash = System.identityHashCode(value); // compare by identity this.key = Objects.requireNonNull(key); } @Override public int hashCode() { return hash; } @Override public boolean equals(Object obj) { VR value; return obj == this || obj != null && obj.getClass() == this.getClass() && // cleared CacheValue is only equal to itself (value = this.get()) != null && // compare value by identity value == ((CacheValue) obj).get(); } public KR getKey() { return key; } } }


-------------------------------------------------------------------------------------------------------------
(注:この文書は主に自分の日常学習を記録し、ついでに共有したいと考えていますが、上記のコードについては同時テストは行われていません)