よく使われる負荷等化アルゴリズム実装


一:負荷等化の概念大規模な分布式アーキテクチャの中で、負荷の高いサービスにとって、往々にして複数の機械からなるクラスタに対応し、要求が来たとき、要求等化をバックエンドサーバーに割り当てるために相応の負荷等化アルゴリズムをサポートする必要があり、相応の負荷等化アルゴリズムによって1台の機械を選択してクライアント要求を処理する.このプロセスをサービスの負荷等化と呼ぶ.二:よく使われる負荷等化アルゴリズムでよく使われる負荷等化アルゴリズムは主にランダム法、ポーリング法、重み付けランダム、重み付けポーリング、最小接続数、一致性hashなどがあり、異なるシーンで使用される負荷はアルゴリズムとは異なり、実際の状況に応じて異なる負荷等化アルゴリズムを選択しなければならない.
次に、比較的一般的な重み付けポーリングと一貫性hashアルゴリズムの実装について説明する.
2.1負荷等化インタフェース
package com.travelsky.pss.react.cn.fzjh.jk;

import java.util.List;

import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;

public interface LoadBalance {
	
	//            
	ServiceInstance chooseServerInstance();

	/**
	 *        
	 * 
	 * @param serviceName    
	 * @param version     
	 */
	void setService(String serviceName, String version);
	
	/**
	 *    
	 */
	void init();
	
	/**
	 *         
	 * 
	 * @return
	 */
	List<ServiceInstance> getServiceInstanceList();
	
	/**
	 *         
	 */
	void updateServerInstanceList();
	
	/**
	 *         
	 * 
	 * @param server
	 */
	void isolateServerInstance(String server);

	/**
	 *         
	 * 
	 * @param server
	 */
	void resumeServerInstance(String server);
}

2.2抽象負荷等化ベースクラス
package com.travelsky.pss.react.cn.fzjh.abstrac;

import java.util.List;
import java.util.Random;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.travelsky.pss.react.cn.fzjh.jk.LoadBalance;
import com.travelsky.pss.react.cn.fzjh.rule.DynamicUploadRule;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;

//         ,                     
public abstract class AbstractLoadBalance implements LoadBalance {

	protected final Logger logger = LoggerFactory
			.getLogger(AbstractLoadBalance.class);
	
	@Resource(name="dynamicUploadRule")
	private transient DynamicUploadRule dynamicUploadRule;

	protected String serviceName;

	protected String version;

	//           
	private transient String serviceKey;

	protected List<ServiceInstance> serviceInstanceList;

	@Override
	public void setService(String serviceName, String version) {
		this.serviceName = serviceName;
		this.version = version;
		this.serviceKey = genKey(serviceName, version);
	}

	//               
	@Override
	public ServiceInstance chooseServerInstance() {
		List<ServiceInstance> allServiceList = getAllServiceInstanceList();
		if (null == allServiceList) {
			return null;
		}
		ServiceInstance serviceInstance = null;
		int indexOfLoop = 0;
		Random random = new Random();
		if (null != allServiceList && allServiceList.size() > 0) {
			int serviceCount = allServiceList.size();
			while (null == serviceInstance && indexOfLoop < serviceCount * 5) {//        ,   serverCount   
				int index = random.nextInt(serviceCount);
				serviceInstance = allServiceList.get(index);
				logger.info("             :" + serviceInstance.getServerName());
				if (serviceInstance.isIsolated()) {
					logger.info("          :" + serviceInstance.getServerName()
							+ ",    ");
					indexOfLoop++;
					serviceInstance = null;
				}
			}
		}
		return serviceInstance;
	}

	@Override
	public void init() {
		//           
		List<ServiceInstance> serviceInstances = getAllServiceInstanceList();
		setServiceInstanceList(serviceInstances);
	}

	@Override
	public List<ServiceInstance> getServiceInstanceList() {
		return serviceInstanceList;
	}

	@Override
	public void updateServerInstanceList() {
		//                     ,    
		List<ServiceInstance> serviceInstanceList = getAllServiceInstanceList();
		setServiceInstanceList(serviceInstanceList);
	}

	@Override
	public void isolateServerInstance(String serverName) {
		for (final ServiceInstance serverInstance : serviceInstanceList) {
			if (serverName.equals(serverInstance.getServerName())) {
				serverInstance.setIsolated(true);
				break;
			}
		}
	}

	@Override
	public void resumeServerInstance(String serverName) {
		for (final ServiceInstance serverInstance : serviceInstanceList) {
			if (serverName.equals(serverInstance.getServerName())) {
				serverInstance.setIsolated(false);
				break;
			}
		}
	}

	//           
	protected ServiceInstance getServiceInstanceByServiceName(String serviceName) {
		ServiceInstance serviceInstance = null;
		List<ServiceInstance> serviceInstances = getAllServiceInstanceList();
		if (null == serviceInstances) {
			return null;
		}
		for (final ServiceInstance instance : serviceInstances) {
			if (instance.getServerName().equals(serviceName)) {
				serviceInstance = instance;
				break;
			}
		}
		return serviceInstance;
	}

	private String genKey(String serviceName, String version) {
		return new StringBuffer().append(serviceName).append('#')
				.append(version).toString();
	}

	private List<ServiceInstance> getAllServiceInstanceList() {
		//              
		List<ServiceInstance> serviceInstanceList = dynamicUploadRule.getServiceInstanceRule();
		return serviceInstanceList;
	}
	
	protected String getServiceNameByServiceKey(String serviceKey){
		int index = serviceKey.indexOf('#');
		return serviceKey.substring(0, index);
	}

	public String getServiceName() {
		return serviceName;
	}

	public void setServiceName(String serviceName) {
		this.serviceName = serviceName;
	}

	public String getVersion() {
		return version;
	}

	public void setVersion(String version) {
		this.version = version;
	}

	public String getServiceKey() {
		return serviceKey;
	}

	public void setServiceKey(String serviceKey) {
		this.serviceKey = serviceKey;
	}

	public void setServiceInstanceList(List<ServiceInstance> serviceInstanceList) {
		this.serviceInstanceList = serviceInstanceList;
	}

}

2.3コンシステンシhashアルゴリズム現在企業でよく使われているメモリベースのキャッシュサービスmemcachedのデフォルトで採用されている負荷等化アルゴリズムはコンシステンシhashアルゴリズムであり、コンシステンシhashアルゴリズムは同じパラメータに対する要求が同じサーバで処理されることを保証することができる.コンシステンシhashアルゴリズムの原理は、利用可能なサーバリストをhashループにマッピングすることができ、クライアントが要求するとkeyのhash値をhashループにマッピングし、時計回りに検索することができ、見つけた最初のマシンは要求されたサービスであり、ループが完了しても見つからない場合は、hashループ上の最初のマシンに要求を転送します.想像できるのは、私たちのクラスタが十分に大きくなれば、対応するサーバを見つけるのに長い間循環する可能性があり、ネットワークIOが大きくなることです.
コンシステンシhashアルゴリズム実装.
package com.travelsky.pss.react.cn.fzjh.loadBalance.rule;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.travelsky.pss.react.cn.fzjh.abstrac.AbstractLoadBalance;
import com.travelsky.pss.react.cn.fzjh.consistenthash.ConsistentHash;
import com.travelsky.pss.react.cn.fzjh.consistenthash.MurmurHash3;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;

@Service("consistentHashLoadBalance")
//     hash        
public class ConsistentHashLoadBalance extends AbstractLoadBalance {

	private static final Logger logger = LoggerFactory
			.getLogger(ConsistentHashLoadBalance.class);

	//       ,                
	public static final int VITRUAL_NODE_NUMBER = 1000;

	//                
	public static final int DEFALUT_NODE_NUMBER = 30;

	private AtomicReference<ConsistentHash<ServiceInstance>> hashRing = new AtomicReference<ConsistentHash<ServiceInstance>>();

	//      ,             ,           
	private int numberOfReplicas;
	
	public ServiceInstance chooseServerInstance(String serviceKey) {
		this.serviceName = getServiceNameByServiceKey(serviceKey);
		//                   
		List<ServiceInstance> instances = hashRing.get().getNUniqueBinsFor(serviceName, getServiceInstanceList().size());
		ServiceInstance serviceInstance = null;
		//      ,             
		for(ServiceInstance instance: instances){
			if(instance.isIsolated()){
				logger.info("      ,     :" + serviceName);
			} else {
				//            
				serviceInstance = instance;
				break;
			}
		}
		return serviceInstance;
	}

	@Override
	public void init() {
		super.init();
		numberOfReplicas = getServiceInstanceList().isEmpty()?DEFALUT_NODE_NUMBER:VITRUAL_NODE_NUMBER/getServiceInstanceList().size();
	    buildHashLoop();
	}

	private void buildHashLoop() {
		logger.info("    hash ");
		hashRing.set(new ConsistentHash<ServiceInstance>(MurmurHash3.getInstance(),numberOfReplicas,serviceInstanceList));
	}

}

動的プロファイル構成のサーバ権限値は次のとおりです.
#serviceInstance{serviceName,qzValue,isolated}
instance1=127.0.0.1,100,false
instance2=127.0.0.2,300,false
instance3=127.0.0.3,200,false

 
その依存するいくつかのクラスは、オープンソースプロジェクトのソースコード修正を参照します.
package com.travelsky.pss.react.cn.fzjh.consistenthash;

import java.nio.charset.Charset;

import com.travelsky.pss.react.cn.fzjh.jk.HashFunction;

//     Infinispan    org.infinispan.commons.hash.MurmurHash3  
//Infinispan    : https://github.com/infinispan/infinispan
/**
 * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
 * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
 * >original in C</a>
 *
 * Only implementing x64 version, because this should always be faster on 64 bit
 * native processors, even 64 bit being ran with a 32 bit OS; this should also
 * be as fast or faster than the x86 version on some modern 32 bit processors.
 *
 * @author Patrick McFarland
 * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a>
 * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
 * @since 5.0
 */
public class MurmurHash3 implements HashFunction {
   private final static MurmurHash3 instance = new MurmurHash3();
   
   public static MurmurHash3 getInstance() {
      return instance;
   }
   
   private MurmurHash3() {
   }
   
   private static final Charset ISO_8859_1 = Charset.forName("ISO-8859-1");

   static class State {
      long h1;
      long h2;

      long k1;
      long k2;

      long c1;
      long c2;
   }

   static long getblock(byte[] key, int i) {
      return
           ((key[i + 0] & 0x00000000000000FFL))
         | ((key[i + 1] & 0x00000000000000FFL) << 8)
         | ((key[i + 2] & 0x00000000000000FFL) << 16)
         | ((key[i + 3] & 0x00000000000000FFL) << 24)
         | ((key[i + 4] & 0x00000000000000FFL) << 32)
         | ((key[i + 5] & 0x00000000000000FFL) << 40)
         | ((key[i + 6] & 0x00000000000000FFL) << 48)
         | ((key[i + 7] & 0x00000000000000FFL) << 56);
   }

   static void bmix(State state) {
      state.k1 *= state.c1;
      state.k1 = (state.k1 << 23) | (state.k1 >>> 64 - 23);
      state.k1 *= state.c2;
      state.h1 ^= state.k1;
      state.h1 += state.h2;

      state.h2 = (state.h2 << 41) | (state.h2 >>> 64 - 41);

      state.k2 *= state.c2;
      state.k2 = (state.k2 << 23) | (state.k2 >>> 64 - 23);
      state.k2 *= state.c1;
      state.h2 ^= state.k2;
      state.h2 += state.h1;

      state.h1 = state.h1 * 3 + 0x52dce729;
      state.h2 = state.h2 * 3 + 0x38495ab5;

      state.c1 = state.c1 * 5 + 0x7b7d159c;
      state.c2 = state.c2 * 5 + 0x6bce6396;
   }

   static long fmix(long k) {
      k ^= k >>> 33;
      k *= 0xff51afd7ed558ccdL;
      k ^= k >>> 33;
      k *= 0xc4ceb9fe1a85ec53L;
      k ^= k >>> 33;

      return k;
   }

   /**
    * Hash a value using the x64 128 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 128 bit hashed key, in an array containing two longs
    */
   public static long[] MurmurHash3_x64_128(final byte[] key, final int seed) {
      State state = new State();

      state.h1 = 0x9368e53c2f6af274L ^ seed;
      state.h2 = 0x586dcd208f7cd3fdL ^ seed;

      state.c1 = 0x87c37b91114253d5L;
      state.c2 = 0x4cf5ad432745937fL;

      for (int i = 0; i < key.length / 16; i++) {
         state.k1 = getblock(key, i * 2 * 8);
         state.k2 = getblock(key, (i * 2 + 1) * 8);

         bmix(state);
      }

      state.k1 = 0;
      state.k2 = 0;

      int tail = (key.length >>> 4) << 4;

      switch (key.length & 15) {
         case 15: state.k2 ^= (long) key[tail + 14] << 48;
         case 14: state.k2 ^= (long) key[tail + 13] << 40;
         case 13: state.k2 ^= (long) key[tail + 12] << 32;
         case 12: state.k2 ^= (long) key[tail + 11] << 24;
         case 11: state.k2 ^= (long) key[tail + 10] << 16;
         case 10: state.k2 ^= (long) key[tail + 9] << 8;
         case 9:  state.k2 ^= key[tail + 8];

         case 8:  state.k1 ^= (long) key[tail + 7] << 56;
         case 7:  state.k1 ^= (long) key[tail + 6] << 48;
         case 6:  state.k1 ^= (long) key[tail + 5] << 40;
         case 5:  state.k1 ^= (long) key[tail + 4] << 32;
         case 4:  state.k1 ^= (long) key[tail + 3] << 24;
         case 3:  state.k1 ^= (long) key[tail + 2] << 16;
         case 2:  state.k1 ^= (long) key[tail + 1] << 8;
         case 1:  state.k1 ^= key[tail + 0];
            bmix(state);
      }

      state.h2 ^= key.length;

      state.h1 += state.h2;
      state.h2 += state.h1;

      state.h1 = fmix(state.h1);
      state.h2 = fmix(state.h2);

      state.h1 += state.h2;
      state.h2 += state.h1;

      return new long[] { state.h1, state.h2 };
   }

   /**
    * Hash a value using the x64 64 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 64 bit hashed key
    */
   public static long MurmurHash3_x64_64(final byte[] key, final int seed) {
      // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
      State state = new State();

      state.h1 = 0x9368e53c2f6af274L ^ seed;
      state.h2 = 0x586dcd208f7cd3fdL ^ seed;

      state.c1 = 0x87c37b91114253d5L;
      state.c2 = 0x4cf5ad432745937fL;

      for (int i = 0; i < key.length / 16; i++) {
         state.k1 = getblock(key, i * 2 * 8);
         state.k2 = getblock(key, (i * 2 + 1) * 8);

         bmix(state);
      }

      state.k1 = 0;
      state.k2 = 0;

      int tail = (key.length >>> 4) << 4;

      switch (key.length & 15) {
         case 15: state.k2 ^= (long) key[tail + 14] << 48;
         case 14: state.k2 ^= (long) key[tail + 13] << 40;
         case 13: state.k2 ^= (long) key[tail + 12] << 32;
         case 12: state.k2 ^= (long) key[tail + 11] << 24;
         case 11: state.k2 ^= (long) key[tail + 10] << 16;
         case 10: state.k2 ^= (long) key[tail + 9] << 8;
         case 9:  state.k2 ^= key[tail + 8];

         case 8:  state.k1 ^= (long) key[tail + 7] << 56;
         case 7:  state.k1 ^= (long) key[tail + 6] << 48;
         case 6:  state.k1 ^= (long) key[tail + 5] << 40;
         case 5:  state.k1 ^= (long) key[tail + 4] << 32;
         case 4:  state.k1 ^= (long) key[tail + 3] << 24;
         case 3:  state.k1 ^= (long) key[tail + 2] << 16;
         case 2:  state.k1 ^= (long) key[tail + 1] << 8;
         case 1:  state.k1 ^= key[tail + 0];
            bmix(state);
      }

      state.h2 ^= key.length;

      state.h1 += state.h2;
      state.h2 += state.h1;

      state.h1 = fmix(state.h1);
      state.h2 = fmix(state.h2);

      state.h1 += state.h2;
      state.h2 += state.h1;

      return state.h1;
   }

   /**
    * Hash a value using the x64 32 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 32 bit hashed key
    */
   public static int MurmurHash3_x64_32(final byte[] key, final int seed) {
      return (int) (MurmurHash3_x64_64(key, seed) >>> 32);
   }

   /**
    * Hash a value using the x64 128 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 128 bit hashed key, in an array containing two longs
    */
   public static long[] MurmurHash3_x64_128(final long[] key, final int seed) {
      State state = new State();

      state.h1 = 0x9368e53c2f6af274L ^ seed;
      state.h2 = 0x586dcd208f7cd3fdL ^ seed;

      state.c1 = 0x87c37b91114253d5L;
      state.c2 = 0x4cf5ad432745937fL;

      for (int i = 0; i < key.length / 2; i++) {
         state.k1 = key[i * 2];
         state.k2 = key[i * 2 + 1];

         bmix(state);
      }

      long tail = key[key.length - 1];

      // Key length is odd
      if ((key.length & 1) == 1) {
         state.k1 ^= tail;
         bmix(state);
      }

      state.h2 ^= key.length * 8;

      state.h1 += state.h2;
      state.h2 += state.h1;

      state.h1 = fmix(state.h1);
      state.h2 = fmix(state.h2);

      state.h1 += state.h2;
      state.h2 += state.h1;

      return new long[] { state.h1, state.h2 };
   }

   /**
    * Hash a value using the x64 64 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 64 bit hashed key
    */
   public static long MurmurHash3_x64_64(final long[] key, final int seed) {
      // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
      State state = new State();

      state.h1 = 0x9368e53c2f6af274L ^ seed;
      state.h2 = 0x586dcd208f7cd3fdL ^ seed;

      state.c1 = 0x87c37b91114253d5L;
      state.c2 = 0x4cf5ad432745937fL;

      for (int i = 0; i < key.length / 2; i++) {
         state.k1 = key[i * 2];
         state.k2 = key[i * 2 + 1];

         bmix(state);
      }

      long tail = key[key.length - 1];

      if (key.length % 2 != 0) {
         state.k1 ^= tail;
         bmix(state);
      }

      state.h2 ^= key.length * 8;

      state.h1 += state.h2;
      state.h2 += state.h1;

      state.h1 = fmix(state.h1);
      state.h2 = fmix(state.h2);

      state.h1 += state.h2;
      state.h2 += state.h1;

      return state.h1;
   }

   /**
    * Hash a value using the x64 32 bit variant of MurmurHash3
    *
    * @param key value to hash
    * @param seed random value
    * @return 32 bit hashed key
    */
   public static int MurmurHash3_x64_32(final long[] key, final int seed) {
      return (int) (MurmurHash3_x64_64(key, seed) >>> 32);
   }

   @Override
   public int hash(byte[] payload) {
      return MurmurHash3_x64_32(payload, 9001);
   }

   /**
    * Hashes a byte array efficiently.
    *
    * @param payload a byte array to hash
    * @return a hash code for the byte array
    */
   public static int hash(long[] payload) {
      return MurmurHash3_x64_32(payload, 9001);
   }

   @Override
   public int hash(Object o) {
      if (o instanceof byte[])
         return hash((byte[]) o);
      else if (o instanceof long[])
         return hash((long[]) o);
      else if (o instanceof String)
         return hash(((String) o).getBytes(ISO_8859_1));
      else
         return hash(o.hashCode());
   }

   @Override
   public boolean equals(Object other) {
      return other != null && other.getClass() == getClass();
   }

   @Override
   public int hashCode() {
      return 0;
   }

   @Override
   public String toString() {
      return "MurmurHash3";
   }
}

 
package com.travelsky.pss.react.cn.fzjh.consistenthash;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

import com.travelsky.pss.react.cn.fzjh.jk.HashFunction;

//      Cloudera Flume    com.cloudera.util.consistenthash.ConsistentHash  
// Cloudera Flume    : https://github.com/cloudera/flume
/**
 * This is an implementation of a consistent hash. T is the type of a bin.
 * 
 * It is mostly copied from Tom White's implementation found here:
 * http://www.lexemetech.com/2007/11/consistent-hashing.html
 * 
 * Blog comments mention that there may be a bug in this implementation -- if
 * there is a key collision we may lose bins. Probabilistically this is small,
 * and even smaller with a higher more replication factor. This could be made
 * even rarer by enlarging the circle by using Long instead of Integer.
 * 
 * getNBins and getNUniqBins return ordered lists of bins for a particular
 * object. This is useful for assigning backups if the first bin fails.
 * 
 * This datastructure is not threadsafe.
 */
public class ConsistentHash<T> {
  // when looking for n unique bins, give up after a streak of MAX_DUPES
  // duplicates
  public final static int MAX_DUPES = 10;

  // # of times a bin is replicated in hash circle. (for better load balancing)
  private final int numberOfReplicas;

  private final HashFunction hashFunction;
  private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();

  public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
      Collection<T> nodes) {
    this.hashFunction = hashFunction;
    this.numberOfReplicas = numberOfReplicas;

    for (T node : nodes) {
      addBin(node);
    }
  }

  /**
   * Add a new bin to the consistent hash
   * 
   * This assumes that the bin's toString method is immutable.
   * 
   * This is not thread safe.
   */
  public void addBin(T bin) {
    for (int i = 0; i < numberOfReplicas; i++) {
      // The string addition forces each replica to have different hash
      circle.put(hashFunction.hash(bin.toString() + i), bin);
    }
  }

  /**
   * Remove a bin from the consistent hash
   * 
   * This assumes that the bin's toString method is immutable.
   * 
   * This is not thread safe.
   */
  public void removeBin(T bin) {
    for (int i = 0; i < numberOfReplicas; i++) {
      // The string addition forces each replica to be different. This needs
      // to resolve to the same keys as addBin.
      circle.remove(hashFunction.hash(bin.toString() + i));
    }
  }

  /**
   * This returns the closest bin for the object. If the object is the bin it
   * should be an exact hit, but if it is a value traverse to find closest
   * subsequent bin.
   */
  public T getBinFor(Object key) {
    if (circle.isEmpty()) {
      return null;
    }
    int hash = hashFunction.hash(key);
    T bin = circle.get(hash);

    if (bin == null) {
      // inexact match -- find the next value in the circle
      SortedMap<Integer, T> tailMap = circle.tailMap(hash);
      hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
      bin = circle.get(hash);
    }
    return bin;
  }

  /**
   * This returns the closest n bins in order for the object. There may be
   * duplicates.
   */
  public List<T> getNBinsFor(Object key, int n) {
    if (circle.isEmpty()) {
      return Collections.<T> emptyList();
    }

    List<T> list = new ArrayList<T>(n);
    int hash = hashFunction.hash(key);
    for (int i = 0; i < n; i++) {
      if (!circle.containsKey(hash)) {
        // go to next element.
        SortedMap<Integer, T> tailMap = circle.tailMap(hash);
        hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
      }
      list.add(circle.get(hash));

      // was a hit so we increment and loop to find the next bin in the
      // circle
      hash++;
    }
    return list;
  }

  /**
   * This returns the closest n bins in order for the object. There is extra
   * code that forces the bin values to be unique.
   * 
   * This will return a list that has all the bins (and is smaller than n) if n
   * > number of bins.
   */
  public List<T> getNUniqueBinsFor(Object key, int n) {
    if (circle.isEmpty()) {
      return Collections.<T> emptyList();
    }

    List<T> list = new ArrayList<T>(n);
    int hash = hashFunction.hash(key);
    int duped = 0;
    for (int i = 0; i < n; i++) {
      if (!circle.containsKey(hash)) {
        // go to next element.
        SortedMap<Integer, T> tailMap = circle.tailMap(hash);
        hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
      }
      T candidate = circle.get(hash);
      if (!list.contains(candidate)) {
        duped = 0;
        list.add(candidate);
      } else {
        duped++;
        i--; // try again.
        if (duped > MAX_DUPES) {
          i++; // we've been duped too many times, just skip to next, returning
          // fewer than n
        }

      }

      // find the next element in the circle
      hash++;
    }
    return list;
  }
}

2.4コンシステンシhashテスト効果
    applicationContext.xmlプロファイル
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context-3.2.xsd">

	<context:annotation-config />
	<!--     -->
	<context:component-scan base-package="com.travelsky.pss.react.cn.fzjh" />

	<bean id="weightedRoundRoBinLoadBalance"
		class="com.travelsky.pss.react.cn.fzjh.loadBalance.rule.WeightedRoundRoBinLoadBalance"
		init-method="init" />

	<bean id="consistentHashLoadBalance"
		class="com.travelsky.pss.react.cn.fzjh.loadBalance.rule.ConsistentHashLoadBalance"
		init-method="init" />
		
    <!--            -->
	<bean id="dynamicUploadConfig"
		class="org.apache.commons.configuration.PropertiesConfiguration"
		init-method="load">
		<property name="file" value="file:config/service.properties" />
		<property name="reloadingStrategy">
			<bean class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy">
				<property name="refreshDelay" value="60000"></property>
			</bean>
		</property>
	</bean>
		
</beans>

テストクラス
package com.travelsky.pss.react_cn_fzjh;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.travelsky.pss.react.cn.fzjh.loadBalance.rule.ConsistentHashLoadBalance;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class TestConsistentHashLoadBalance {
	
    @Autowired
	private transient ConsistentHashLoadBalance consistentHashLoadBalance;
    
    
    @Test
    public void testConsistentLoadBalance() throws InterruptedException{
    	ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(15, 20, 60,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));
		for(int i=1;i<=15;i++){
			final int index = i;
			poolExecutor.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println("    : " + Thread.currentThread().getName() + ",serviceKey:" + "127.0.0." + index  + ",    :" + consistentHashLoadBalance.chooseServerInstance("127.0.0." + index + "#1.0.0" ).getServerName());
				}
			});
		}
    }

}

////////////////////////////////////        ////////////////////////////////////////////

    : pool-1-thread-2,serviceKey:127.0.0.2,    :127.0.0.2
    : pool-1-thread-5,serviceKey:127.0.0.5,    :127.0.0.3
    : pool-1-thread-10,serviceKey:127.0.0.10,    :127.0.0.1
    : pool-1-thread-9,serviceKey:127.0.0.9,    :127.0.0.3
    : pool-1-thread-3,serviceKey:127.0.0.3,    :127.0.0.1
    : pool-1-thread-4,serviceKey:127.0.0.4,    :127.0.0.2
    : pool-1-thread-1,serviceKey:127.0.0.1,    :127.0.0.1
    : pool-1-thread-14,serviceKey:127.0.0.14,    :127.0.0.1
    : pool-1-thread-6,serviceKey:127.0.0.6,    :127.0.0.1
    : pool-1-thread-13,serviceKey:127.0.0.13,    :127.0.0.2
    : pool-1-thread-7,serviceKey:127.0.0.7,    :127.0.0.2
    : pool-1-thread-8,serviceKey:127.0.0.8,    :127.0.0.2
    : pool-1-thread-11,serviceKey:127.0.0.11,    :127.0.0.2
    : pool-1-thread-15,serviceKey:127.0.0.15,    :127.0.0.1
    : pool-1-thread-12,serviceKey:127.0.0.12,    :127.0.0.1

//////////////////////////////////////////////////////////////////

    : pool-1-thread-1,serviceKey:127.0.0.1,    :127.0.0.1
    : pool-1-thread-2,serviceKey:127.0.0.2,    :127.0.0.2
    : pool-1-thread-10,serviceKey:127.0.0.10,    :127.0.0.1
    : pool-1-thread-5,serviceKey:127.0.0.5,    :127.0.0.3
    : pool-1-thread-14,serviceKey:127.0.0.14,    :127.0.0.1
    : pool-1-thread-6,serviceKey:127.0.0.6,    :127.0.0.1
    : pool-1-thread-3,serviceKey:127.0.0.3,    :127.0.0.1
    : pool-1-thread-7,serviceKey:127.0.0.7,    :127.0.0.2
    : pool-1-thread-11,serviceKey:127.0.0.11,    :127.0.0.2
    : pool-1-thread-15,serviceKey:127.0.0.15,    :127.0.0.1
    : pool-1-thread-9,serviceKey:127.0.0.9,    :127.0.0.3
    : pool-1-thread-13,serviceKey:127.0.0.13,    :127.0.0.2
    : pool-1-thread-4,serviceKey:127.0.0.4,    :127.0.0.2
    : pool-1-thread-8,serviceKey:127.0.0.8,    :127.0.0.2
    : pool-1-thread-12,serviceKey:127.0.0.12,    :127.0.0.1

同じリクエストがhashを経て見つかった処理リクエストのサーバが同じであることがわかる.
2.5重み付けポーリングは、パフォーマンスが高く、負荷が低いサーバに対して、より多くのリクエストを処理するために、より高い重み値を与えることができます.
     
package com.travelsky.pss.react.cn.fzjh.loadBalance.rule;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.travelsky.pss.react.cn.fzjh.abstrac.AbstractLoadBalance;
import com.travelsky.pss.react.cn.fzjh.rule.DynamicUploadRule;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;

//              
@Service("weightedRoundRoBinLoadBalance")
public class WeightedRoundRoBinLoadBalance extends AbstractLoadBalance {

	protected final Logger logger = LoggerFactory
			.getLogger(WeightedRoundRoBinLoadBalance.class);
	
	@Resource(name="dynamicUploadRule")
	private DynamicUploadRule dynamicUploadRule;

	//                
	private int gcd;

	//         
	private int max;

	//     
	private int cycle;

	//            
	private int currentIndex = -1;
	
	/**
	 *            ,                  ,   ,AtomicReference      
	 */
	private AtomicReference<List<String>> WRRList = new AtomicReference<List<String>>();

	@Override
	public void init() {
		super.init();
		buildWRRList();
	}

	@Override
	public ServiceInstance chooseServerInstance() {
		if (!isNotEmpty(WRRList.get())) {
			logger.info("             ,              ");
			return super.chooseServerInstance();
		}
		ServiceInstance serviceInstance = null;
		synchronized (this) {
			int index = 0;
			while (index < cycle && null == serviceInstance) {
				currentIndex = (currentIndex + 1) % WRRList.get().size();
				String serviceName = WRRList.get().get(currentIndex);
				serviceInstance = getServiceInstanceByServiceName(serviceName);
				if (null == serviceInstance || serviceInstance.isIsolated()) {
					index++;
				}
			}
		}
		return serviceInstance;
	}

    /**
    *                  
    */
	private void buildWRRList() {
		boolean isGetSucc = false;
		if (!getServiceInstanceList().isEmpty()) {
			logger.info("          ,                 ");
			isGetSucc = calcLoadFactors();
		}
		if (isGetSucc) {
			//      server  
			int total = getServiceInstanceList().size();
			//         
			int i = -1;
			//      
			int cw = 0;
			List<String> newWrrList = new ArrayList<String>(total);
			//      ,      ,            ,               ,      ;
			//        ,          ,       ;
			//            ,             ,      。       
			for (int j = 0; j < cycle; j++) {
				while (true) {
					i = (i + 1) % total;//      
					if (i == 0) {
						cw = cw - gcd;//        
						if (cw <= 0) {
							cw = max;
							//            
							if (cw == 0) {
								newWrrList.add(null);
								break;
							}
						}
					}
					ServiceInstance serviceInstance = getServiceInstanceList()
							.get(i);
					String serverName = serviceInstance.getServerName();
					//        server        servername.
					if(serviceInstance.getQzValue() >= cw){
						newWrrList.add(serverName);
						break;
					}
				}
			}
			WRRList.set(newWrrList);
		}
	}

	/**
	 *         ,    ,    
	 * @return
	 */
	private boolean calcLoadFactors() {
		//          ,                    (         ,     DB   )
		/**
		 * 1:         2:                   
		 */
		//            
		List<Integer> factors = getDefault();
		if (null == factors || factors.size() == 0) {
			return false;
		}
		//         eg:10,20       10
		gcd = calcMaxGCD(factors);
		max = calcMaxValue(factors);
		cycle = calcCycle(factors, gcd);
		return true;
	}

	/**
	 *       ,    /         
	 * eg:100/100 + 200/100 + 300/100 = 6
	 * @param factors          
	 * @param gcd      
	 * @return
	 */
	private int calcCycle(List<Integer> factors, int gcd) {
		int cycle = 0;
		for (int i = 0; i < factors.size(); i++) {
			cycle += factors.get(i) / gcd;
		}
		return cycle;
	}

	/**
	 *          
	 * 
	 * @param factors
	 * @return
	 */
	private int calcMaxValue(List<Integer> factors) {
		int max = 0;
		for (int i = 0; i < factors.size(); i++) {
			if (factors.get(i) > max) {
				max = factors.get(i);
			}
		}
		return max;
	}

	/**
	 *        
	 * @param factors
	 * @return
	 */
	private int calcMaxGCD(List<Integer> factors) {
		int max = 0;
		for (int i = 0; i < factors.size(); i++) {
			if (factors.get(i) > 0) {
				max = divisor(factors.get(i), max);
			}
		}
		return max;
	}

	/**
	 *          
	 * @param m      
	 * @param n      
	 * @return int     
	 */
	private int divisor(int m, int n) {
		if (m < n) {
			int temp;
			temp = m;
			m = n;
			n = temp;
		}
		if (0 == n) {
			return m;
		}
		return divisor(m - n, n);
	}

	/**
	 *          
	 * @param serviceNames    
	 * @return
	 */
	private List<Integer> getDefault() {
		List<Integer> list = new ArrayList<Integer>();
		List<ServiceInstance> instances = dynamicUploadRule.getServiceInstanceRule();
		for (final ServiceInstance serviceInstance : instances) {
		   list.add(serviceInstance.getQzValue());
	     }
		return list;
	}

	/**
	 *     
	 * @param list
	 * @return
	 */
	private boolean isNotEmpty(List<String> list) {
		return null != list && list.size() > 0;
	}

}

2.6重み付けポーリングアルゴリズムによる効果
 
package com.travelsky.pss.react_cn_fzjh;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.travelsky.pss.react.cn.fzjh.loadBalance.rule.WeightedRoundRoBinLoadBalance;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class TestWeightRoundRoBinLoadBalance {
	
    @Autowired
	private transient WeightedRoundRoBinLoadBalance weightedRoundRoBinLoadBalance;
    
    
    @Test
    public void testWeightedRoundRoBinLoadBalance(){
    	ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(15, 20, 60,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));
		for(int i=1;i<=15;i++){
			poolExecutor.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println("    : " + Thread.currentThread().getName() + "    :" + weightedRoundRoBinLoadBalance.chooseServerInstance().getServerName());
				}
			});
		}
    }

}
///////////////////////////////////////  /////////////////////////////////////////
    : pool-1-thread-2    :127.0.0.2
    : pool-1-thread-1    :127.0.0.2
    : pool-1-thread-5    :127.0.0.3
    : pool-1-thread-6    :127.0.0.1
    : pool-1-thread-3    :127.0.0.2
    : pool-1-thread-9    :127.0.0.3
    : pool-1-thread-10    :127.0.0.2
    : pool-1-thread-13    :127.0.0.2
    : pool-1-thread-11    :127.0.0.1
    : pool-1-thread-7    :127.0.0.3
    : pool-1-thread-15    :127.0.0.2
    : pool-1-thread-4    :127.0.0.3
    : pool-1-thread-8    :127.0.0.2
    : pool-1-thread-12    :127.0.0.2

ソースのダウンロード: