よく使われる負荷等化アルゴリズム実装
43168 ワード
一:負荷等化の概念大規模な分布式アーキテクチャの中で、負荷の高いサービスにとって、往々にして複数の機械からなるクラスタに対応し、要求が来たとき、要求等化をバックエンドサーバーに割り当てるために相応の負荷等化アルゴリズムをサポートする必要があり、相応の負荷等化アルゴリズムによって1台の機械を選択してクライアント要求を処理する.このプロセスをサービスの負荷等化と呼ぶ.二:よく使われる負荷等化アルゴリズムでよく使われる負荷等化アルゴリズムは主にランダム法、ポーリング法、重み付けランダム、重み付けポーリング、最小接続数、一致性hashなどがあり、異なるシーンで使用される負荷はアルゴリズムとは異なり、実際の状況に応じて異なる負荷等化アルゴリズムを選択しなければならない.
次に、比較的一般的な重み付けポーリングと一貫性hashアルゴリズムの実装について説明する.
2.1負荷等化インタフェース
2.2抽象負荷等化ベースクラス
2.3コンシステンシhashアルゴリズム現在企業でよく使われているメモリベースのキャッシュサービスmemcachedのデフォルトで採用されている負荷等化アルゴリズムはコンシステンシhashアルゴリズムであり、コンシステンシhashアルゴリズムは同じパラメータに対する要求が同じサーバで処理されることを保証することができる.コンシステンシhashアルゴリズムの原理は、利用可能なサーバリストをhashループにマッピングすることができ、クライアントが要求するとkeyのhash値をhashループにマッピングし、時計回りに検索することができ、見つけた最初のマシンは要求されたサービスであり、ループが完了しても見つからない場合は、hashループ上の最初のマシンに要求を転送します.想像できるのは、私たちのクラスタが十分に大きくなれば、対応するサーバを見つけるのに長い間循環する可能性があり、ネットワークIOが大きくなることです.
コンシステンシhashアルゴリズム実装.
動的プロファイル構成のサーバ権限値は次のとおりです.
その依存するいくつかのクラスは、オープンソースプロジェクトのソースコード修正を参照します.
2.4コンシステンシhashテスト効果
applicationContext.xmlプロファイル
テストクラス
同じリクエストがhashを経て見つかった処理リクエストのサーバが同じであることがわかる.
2.5重み付けポーリングは、パフォーマンスが高く、負荷が低いサーバに対して、より多くのリクエストを処理するために、より高い重み値を与えることができます.
2.6重み付けポーリングアルゴリズムによる効果
ソースのダウンロード:
次に、比較的一般的な重み付けポーリングと一貫性hashアルゴリズムの実装について説明する.
2.1負荷等化インタフェース
package com.travelsky.pss.react.cn.fzjh.jk;
import java.util.List;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;
public interface LoadBalance {
//
ServiceInstance chooseServerInstance();
/**
*
*
* @param serviceName
* @param version
*/
void setService(String serviceName, String version);
/**
*
*/
void init();
/**
*
*
* @return
*/
List<ServiceInstance> getServiceInstanceList();
/**
*
*/
void updateServerInstanceList();
/**
*
*
* @param server
*/
void isolateServerInstance(String server);
/**
*
*
* @param server
*/
void resumeServerInstance(String server);
}
2.2抽象負荷等化ベースクラス
package com.travelsky.pss.react.cn.fzjh.abstrac;
import java.util.List;
import java.util.Random;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.travelsky.pss.react.cn.fzjh.jk.LoadBalance;
import com.travelsky.pss.react.cn.fzjh.rule.DynamicUploadRule;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;
// ,
public abstract class AbstractLoadBalance implements LoadBalance {
protected final Logger logger = LoggerFactory
.getLogger(AbstractLoadBalance.class);
@Resource(name="dynamicUploadRule")
private transient DynamicUploadRule dynamicUploadRule;
protected String serviceName;
protected String version;
//
private transient String serviceKey;
protected List<ServiceInstance> serviceInstanceList;
@Override
public void setService(String serviceName, String version) {
this.serviceName = serviceName;
this.version = version;
this.serviceKey = genKey(serviceName, version);
}
//
@Override
public ServiceInstance chooseServerInstance() {
List<ServiceInstance> allServiceList = getAllServiceInstanceList();
if (null == allServiceList) {
return null;
}
ServiceInstance serviceInstance = null;
int indexOfLoop = 0;
Random random = new Random();
if (null != allServiceList && allServiceList.size() > 0) {
int serviceCount = allServiceList.size();
while (null == serviceInstance && indexOfLoop < serviceCount * 5) {// , serverCount
int index = random.nextInt(serviceCount);
serviceInstance = allServiceList.get(index);
logger.info(" :" + serviceInstance.getServerName());
if (serviceInstance.isIsolated()) {
logger.info(" :" + serviceInstance.getServerName()
+ ", ");
indexOfLoop++;
serviceInstance = null;
}
}
}
return serviceInstance;
}
@Override
public void init() {
//
List<ServiceInstance> serviceInstances = getAllServiceInstanceList();
setServiceInstanceList(serviceInstances);
}
@Override
public List<ServiceInstance> getServiceInstanceList() {
return serviceInstanceList;
}
@Override
public void updateServerInstanceList() {
// ,
List<ServiceInstance> serviceInstanceList = getAllServiceInstanceList();
setServiceInstanceList(serviceInstanceList);
}
@Override
public void isolateServerInstance(String serverName) {
for (final ServiceInstance serverInstance : serviceInstanceList) {
if (serverName.equals(serverInstance.getServerName())) {
serverInstance.setIsolated(true);
break;
}
}
}
@Override
public void resumeServerInstance(String serverName) {
for (final ServiceInstance serverInstance : serviceInstanceList) {
if (serverName.equals(serverInstance.getServerName())) {
serverInstance.setIsolated(false);
break;
}
}
}
//
protected ServiceInstance getServiceInstanceByServiceName(String serviceName) {
ServiceInstance serviceInstance = null;
List<ServiceInstance> serviceInstances = getAllServiceInstanceList();
if (null == serviceInstances) {
return null;
}
for (final ServiceInstance instance : serviceInstances) {
if (instance.getServerName().equals(serviceName)) {
serviceInstance = instance;
break;
}
}
return serviceInstance;
}
private String genKey(String serviceName, String version) {
return new StringBuffer().append(serviceName).append('#')
.append(version).toString();
}
private List<ServiceInstance> getAllServiceInstanceList() {
//
List<ServiceInstance> serviceInstanceList = dynamicUploadRule.getServiceInstanceRule();
return serviceInstanceList;
}
protected String getServiceNameByServiceKey(String serviceKey){
int index = serviceKey.indexOf('#');
return serviceKey.substring(0, index);
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getServiceKey() {
return serviceKey;
}
public void setServiceKey(String serviceKey) {
this.serviceKey = serviceKey;
}
public void setServiceInstanceList(List<ServiceInstance> serviceInstanceList) {
this.serviceInstanceList = serviceInstanceList;
}
}
2.3コンシステンシhashアルゴリズム現在企業でよく使われているメモリベースのキャッシュサービスmemcachedのデフォルトで採用されている負荷等化アルゴリズムはコンシステンシhashアルゴリズムであり、コンシステンシhashアルゴリズムは同じパラメータに対する要求が同じサーバで処理されることを保証することができる.コンシステンシhashアルゴリズムの原理は、利用可能なサーバリストをhashループにマッピングすることができ、クライアントが要求するとkeyのhash値をhashループにマッピングし、時計回りに検索することができ、見つけた最初のマシンは要求されたサービスであり、ループが完了しても見つからない場合は、hashループ上の最初のマシンに要求を転送します.想像できるのは、私たちのクラスタが十分に大きくなれば、対応するサーバを見つけるのに長い間循環する可能性があり、ネットワークIOが大きくなることです.
コンシステンシhashアルゴリズム実装.
package com.travelsky.pss.react.cn.fzjh.loadBalance.rule;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.travelsky.pss.react.cn.fzjh.abstrac.AbstractLoadBalance;
import com.travelsky.pss.react.cn.fzjh.consistenthash.ConsistentHash;
import com.travelsky.pss.react.cn.fzjh.consistenthash.MurmurHash3;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;
@Service("consistentHashLoadBalance")
// hash
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private static final Logger logger = LoggerFactory
.getLogger(ConsistentHashLoadBalance.class);
// ,
public static final int VITRUAL_NODE_NUMBER = 1000;
//
public static final int DEFALUT_NODE_NUMBER = 30;
private AtomicReference<ConsistentHash<ServiceInstance>> hashRing = new AtomicReference<ConsistentHash<ServiceInstance>>();
// , ,
private int numberOfReplicas;
public ServiceInstance chooseServerInstance(String serviceKey) {
this.serviceName = getServiceNameByServiceKey(serviceKey);
//
List<ServiceInstance> instances = hashRing.get().getNUniqueBinsFor(serviceName, getServiceInstanceList().size());
ServiceInstance serviceInstance = null;
// ,
for(ServiceInstance instance: instances){
if(instance.isIsolated()){
logger.info(" , :" + serviceName);
} else {
//
serviceInstance = instance;
break;
}
}
return serviceInstance;
}
@Override
public void init() {
super.init();
numberOfReplicas = getServiceInstanceList().isEmpty()?DEFALUT_NODE_NUMBER:VITRUAL_NODE_NUMBER/getServiceInstanceList().size();
buildHashLoop();
}
private void buildHashLoop() {
logger.info(" hash ");
hashRing.set(new ConsistentHash<ServiceInstance>(MurmurHash3.getInstance(),numberOfReplicas,serviceInstanceList));
}
}
動的プロファイル構成のサーバ権限値は次のとおりです.
#serviceInstance{serviceName,qzValue,isolated}
instance1=127.0.0.1,100,false
instance2=127.0.0.2,300,false
instance3=127.0.0.3,200,false
その依存するいくつかのクラスは、オープンソースプロジェクトのソースコード修正を参照します.
package com.travelsky.pss.react.cn.fzjh.consistenthash;
import java.nio.charset.Charset;
import com.travelsky.pss.react.cn.fzjh.jk.HashFunction;
// Infinispan org.infinispan.commons.hash.MurmurHash3
//Infinispan : https://github.com/infinispan/infinispan
/**
* MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
* "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
* >original in C</a>
*
* Only implementing x64 version, because this should always be faster on 64 bit
* native processors, even 64 bit being ran with a 32 bit OS; this should also
* be as fast or faster than the x86 version on some modern 32 bit processors.
*
* @author Patrick McFarland
* @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a>
* @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
* @since 5.0
*/
public class MurmurHash3 implements HashFunction {
private final static MurmurHash3 instance = new MurmurHash3();
public static MurmurHash3 getInstance() {
return instance;
}
private MurmurHash3() {
}
private static final Charset ISO_8859_1 = Charset.forName("ISO-8859-1");
static class State {
long h1;
long h2;
long k1;
long k2;
long c1;
long c2;
}
static long getblock(byte[] key, int i) {
return
((key[i + 0] & 0x00000000000000FFL))
| ((key[i + 1] & 0x00000000000000FFL) << 8)
| ((key[i + 2] & 0x00000000000000FFL) << 16)
| ((key[i + 3] & 0x00000000000000FFL) << 24)
| ((key[i + 4] & 0x00000000000000FFL) << 32)
| ((key[i + 5] & 0x00000000000000FFL) << 40)
| ((key[i + 6] & 0x00000000000000FFL) << 48)
| ((key[i + 7] & 0x00000000000000FFL) << 56);
}
static void bmix(State state) {
state.k1 *= state.c1;
state.k1 = (state.k1 << 23) | (state.k1 >>> 64 - 23);
state.k1 *= state.c2;
state.h1 ^= state.k1;
state.h1 += state.h2;
state.h2 = (state.h2 << 41) | (state.h2 >>> 64 - 41);
state.k2 *= state.c2;
state.k2 = (state.k2 << 23) | (state.k2 >>> 64 - 23);
state.k2 *= state.c1;
state.h2 ^= state.k2;
state.h2 += state.h1;
state.h1 = state.h1 * 3 + 0x52dce729;
state.h2 = state.h2 * 3 + 0x38495ab5;
state.c1 = state.c1 * 5 + 0x7b7d159c;
state.c2 = state.c2 * 5 + 0x6bce6396;
}
static long fmix(long k) {
k ^= k >>> 33;
k *= 0xff51afd7ed558ccdL;
k ^= k >>> 33;
k *= 0xc4ceb9fe1a85ec53L;
k ^= k >>> 33;
return k;
}
/**
* Hash a value using the x64 128 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 128 bit hashed key, in an array containing two longs
*/
public static long[] MurmurHash3_x64_128(final byte[] key, final int seed) {
State state = new State();
state.h1 = 0x9368e53c2f6af274L ^ seed;
state.h2 = 0x586dcd208f7cd3fdL ^ seed;
state.c1 = 0x87c37b91114253d5L;
state.c2 = 0x4cf5ad432745937fL;
for (int i = 0; i < key.length / 16; i++) {
state.k1 = getblock(key, i * 2 * 8);
state.k2 = getblock(key, (i * 2 + 1) * 8);
bmix(state);
}
state.k1 = 0;
state.k2 = 0;
int tail = (key.length >>> 4) << 4;
switch (key.length & 15) {
case 15: state.k2 ^= (long) key[tail + 14] << 48;
case 14: state.k2 ^= (long) key[tail + 13] << 40;
case 13: state.k2 ^= (long) key[tail + 12] << 32;
case 12: state.k2 ^= (long) key[tail + 11] << 24;
case 11: state.k2 ^= (long) key[tail + 10] << 16;
case 10: state.k2 ^= (long) key[tail + 9] << 8;
case 9: state.k2 ^= key[tail + 8];
case 8: state.k1 ^= (long) key[tail + 7] << 56;
case 7: state.k1 ^= (long) key[tail + 6] << 48;
case 6: state.k1 ^= (long) key[tail + 5] << 40;
case 5: state.k1 ^= (long) key[tail + 4] << 32;
case 4: state.k1 ^= (long) key[tail + 3] << 24;
case 3: state.k1 ^= (long) key[tail + 2] << 16;
case 2: state.k1 ^= (long) key[tail + 1] << 8;
case 1: state.k1 ^= key[tail + 0];
bmix(state);
}
state.h2 ^= key.length;
state.h1 += state.h2;
state.h2 += state.h1;
state.h1 = fmix(state.h1);
state.h2 = fmix(state.h2);
state.h1 += state.h2;
state.h2 += state.h1;
return new long[] { state.h1, state.h2 };
}
/**
* Hash a value using the x64 64 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 64 bit hashed key
*/
public static long MurmurHash3_x64_64(final byte[] key, final int seed) {
// Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
State state = new State();
state.h1 = 0x9368e53c2f6af274L ^ seed;
state.h2 = 0x586dcd208f7cd3fdL ^ seed;
state.c1 = 0x87c37b91114253d5L;
state.c2 = 0x4cf5ad432745937fL;
for (int i = 0; i < key.length / 16; i++) {
state.k1 = getblock(key, i * 2 * 8);
state.k2 = getblock(key, (i * 2 + 1) * 8);
bmix(state);
}
state.k1 = 0;
state.k2 = 0;
int tail = (key.length >>> 4) << 4;
switch (key.length & 15) {
case 15: state.k2 ^= (long) key[tail + 14] << 48;
case 14: state.k2 ^= (long) key[tail + 13] << 40;
case 13: state.k2 ^= (long) key[tail + 12] << 32;
case 12: state.k2 ^= (long) key[tail + 11] << 24;
case 11: state.k2 ^= (long) key[tail + 10] << 16;
case 10: state.k2 ^= (long) key[tail + 9] << 8;
case 9: state.k2 ^= key[tail + 8];
case 8: state.k1 ^= (long) key[tail + 7] << 56;
case 7: state.k1 ^= (long) key[tail + 6] << 48;
case 6: state.k1 ^= (long) key[tail + 5] << 40;
case 5: state.k1 ^= (long) key[tail + 4] << 32;
case 4: state.k1 ^= (long) key[tail + 3] << 24;
case 3: state.k1 ^= (long) key[tail + 2] << 16;
case 2: state.k1 ^= (long) key[tail + 1] << 8;
case 1: state.k1 ^= key[tail + 0];
bmix(state);
}
state.h2 ^= key.length;
state.h1 += state.h2;
state.h2 += state.h1;
state.h1 = fmix(state.h1);
state.h2 = fmix(state.h2);
state.h1 += state.h2;
state.h2 += state.h1;
return state.h1;
}
/**
* Hash a value using the x64 32 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 32 bit hashed key
*/
public static int MurmurHash3_x64_32(final byte[] key, final int seed) {
return (int) (MurmurHash3_x64_64(key, seed) >>> 32);
}
/**
* Hash a value using the x64 128 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 128 bit hashed key, in an array containing two longs
*/
public static long[] MurmurHash3_x64_128(final long[] key, final int seed) {
State state = new State();
state.h1 = 0x9368e53c2f6af274L ^ seed;
state.h2 = 0x586dcd208f7cd3fdL ^ seed;
state.c1 = 0x87c37b91114253d5L;
state.c2 = 0x4cf5ad432745937fL;
for (int i = 0; i < key.length / 2; i++) {
state.k1 = key[i * 2];
state.k2 = key[i * 2 + 1];
bmix(state);
}
long tail = key[key.length - 1];
// Key length is odd
if ((key.length & 1) == 1) {
state.k1 ^= tail;
bmix(state);
}
state.h2 ^= key.length * 8;
state.h1 += state.h2;
state.h2 += state.h1;
state.h1 = fmix(state.h1);
state.h2 = fmix(state.h2);
state.h1 += state.h2;
state.h2 += state.h1;
return new long[] { state.h1, state.h2 };
}
/**
* Hash a value using the x64 64 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 64 bit hashed key
*/
public static long MurmurHash3_x64_64(final long[] key, final int seed) {
// Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
State state = new State();
state.h1 = 0x9368e53c2f6af274L ^ seed;
state.h2 = 0x586dcd208f7cd3fdL ^ seed;
state.c1 = 0x87c37b91114253d5L;
state.c2 = 0x4cf5ad432745937fL;
for (int i = 0; i < key.length / 2; i++) {
state.k1 = key[i * 2];
state.k2 = key[i * 2 + 1];
bmix(state);
}
long tail = key[key.length - 1];
if (key.length % 2 != 0) {
state.k1 ^= tail;
bmix(state);
}
state.h2 ^= key.length * 8;
state.h1 += state.h2;
state.h2 += state.h1;
state.h1 = fmix(state.h1);
state.h2 = fmix(state.h2);
state.h1 += state.h2;
state.h2 += state.h1;
return state.h1;
}
/**
* Hash a value using the x64 32 bit variant of MurmurHash3
*
* @param key value to hash
* @param seed random value
* @return 32 bit hashed key
*/
public static int MurmurHash3_x64_32(final long[] key, final int seed) {
return (int) (MurmurHash3_x64_64(key, seed) >>> 32);
}
@Override
public int hash(byte[] payload) {
return MurmurHash3_x64_32(payload, 9001);
}
/**
* Hashes a byte array efficiently.
*
* @param payload a byte array to hash
* @return a hash code for the byte array
*/
public static int hash(long[] payload) {
return MurmurHash3_x64_32(payload, 9001);
}
@Override
public int hash(Object o) {
if (o instanceof byte[])
return hash((byte[]) o);
else if (o instanceof long[])
return hash((long[]) o);
else if (o instanceof String)
return hash(((String) o).getBytes(ISO_8859_1));
else
return hash(o.hashCode());
}
@Override
public boolean equals(Object other) {
return other != null && other.getClass() == getClass();
}
@Override
public int hashCode() {
return 0;
}
@Override
public String toString() {
return "MurmurHash3";
}
}
package com.travelsky.pss.react.cn.fzjh.consistenthash;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import com.travelsky.pss.react.cn.fzjh.jk.HashFunction;
// Cloudera Flume com.cloudera.util.consistenthash.ConsistentHash
// Cloudera Flume : https://github.com/cloudera/flume
/**
* This is an implementation of a consistent hash. T is the type of a bin.
*
* It is mostly copied from Tom White's implementation found here:
* http://www.lexemetech.com/2007/11/consistent-hashing.html
*
* Blog comments mention that there may be a bug in this implementation -- if
* there is a key collision we may lose bins. Probabilistically this is small,
* and even smaller with a higher more replication factor. This could be made
* even rarer by enlarging the circle by using Long instead of Integer.
*
* getNBins and getNUniqBins return ordered lists of bins for a particular
* object. This is useful for assigning backups if the first bin fails.
*
* This datastructure is not threadsafe.
*/
public class ConsistentHash<T> {
// when looking for n unique bins, give up after a streak of MAX_DUPES
// duplicates
public final static int MAX_DUPES = 10;
// # of times a bin is replicated in hash circle. (for better load balancing)
private final int numberOfReplicas;
private final HashFunction hashFunction;
private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();
public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
addBin(node);
}
}
/**
* Add a new bin to the consistent hash
*
* This assumes that the bin's toString method is immutable.
*
* This is not thread safe.
*/
public void addBin(T bin) {
for (int i = 0; i < numberOfReplicas; i++) {
// The string addition forces each replica to have different hash
circle.put(hashFunction.hash(bin.toString() + i), bin);
}
}
/**
* Remove a bin from the consistent hash
*
* This assumes that the bin's toString method is immutable.
*
* This is not thread safe.
*/
public void removeBin(T bin) {
for (int i = 0; i < numberOfReplicas; i++) {
// The string addition forces each replica to be different. This needs
// to resolve to the same keys as addBin.
circle.remove(hashFunction.hash(bin.toString() + i));
}
}
/**
* This returns the closest bin for the object. If the object is the bin it
* should be an exact hit, but if it is a value traverse to find closest
* subsequent bin.
*/
public T getBinFor(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
T bin = circle.get(hash);
if (bin == null) {
// inexact match -- find the next value in the circle
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
bin = circle.get(hash);
}
return bin;
}
/**
* This returns the closest n bins in order for the object. There may be
* duplicates.
*/
public List<T> getNBinsFor(Object key, int n) {
if (circle.isEmpty()) {
return Collections.<T> emptyList();
}
List<T> list = new ArrayList<T>(n);
int hash = hashFunction.hash(key);
for (int i = 0; i < n; i++) {
if (!circle.containsKey(hash)) {
// go to next element.
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
list.add(circle.get(hash));
// was a hit so we increment and loop to find the next bin in the
// circle
hash++;
}
return list;
}
/**
* This returns the closest n bins in order for the object. There is extra
* code that forces the bin values to be unique.
*
* This will return a list that has all the bins (and is smaller than n) if n
* > number of bins.
*/
public List<T> getNUniqueBinsFor(Object key, int n) {
if (circle.isEmpty()) {
return Collections.<T> emptyList();
}
List<T> list = new ArrayList<T>(n);
int hash = hashFunction.hash(key);
int duped = 0;
for (int i = 0; i < n; i++) {
if (!circle.containsKey(hash)) {
// go to next element.
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
T candidate = circle.get(hash);
if (!list.contains(candidate)) {
duped = 0;
list.add(candidate);
} else {
duped++;
i--; // try again.
if (duped > MAX_DUPES) {
i++; // we've been duped too many times, just skip to next, returning
// fewer than n
}
}
// find the next element in the circle
hash++;
}
return list;
}
}
2.4コンシステンシhashテスト効果
applicationContext.xmlプロファイル
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<context:annotation-config />
<!-- -->
<context:component-scan base-package="com.travelsky.pss.react.cn.fzjh" />
<bean id="weightedRoundRoBinLoadBalance"
class="com.travelsky.pss.react.cn.fzjh.loadBalance.rule.WeightedRoundRoBinLoadBalance"
init-method="init" />
<bean id="consistentHashLoadBalance"
class="com.travelsky.pss.react.cn.fzjh.loadBalance.rule.ConsistentHashLoadBalance"
init-method="init" />
<!-- -->
<bean id="dynamicUploadConfig"
class="org.apache.commons.configuration.PropertiesConfiguration"
init-method="load">
<property name="file" value="file:config/service.properties" />
<property name="reloadingStrategy">
<bean class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy">
<property name="refreshDelay" value="60000"></property>
</bean>
</property>
</bean>
</beans>
テストクラス
package com.travelsky.pss.react_cn_fzjh;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.travelsky.pss.react.cn.fzjh.loadBalance.rule.ConsistentHashLoadBalance;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class TestConsistentHashLoadBalance {
@Autowired
private transient ConsistentHashLoadBalance consistentHashLoadBalance;
@Test
public void testConsistentLoadBalance() throws InterruptedException{
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(15, 20, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));
for(int i=1;i<=15;i++){
final int index = i;
poolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(" : " + Thread.currentThread().getName() + ",serviceKey:" + "127.0.0." + index + ", :" + consistentHashLoadBalance.chooseServerInstance("127.0.0." + index + "#1.0.0" ).getServerName());
}
});
}
}
}
//////////////////////////////////// ////////////////////////////////////////////
: pool-1-thread-2,serviceKey:127.0.0.2, :127.0.0.2
: pool-1-thread-5,serviceKey:127.0.0.5, :127.0.0.3
: pool-1-thread-10,serviceKey:127.0.0.10, :127.0.0.1
: pool-1-thread-9,serviceKey:127.0.0.9, :127.0.0.3
: pool-1-thread-3,serviceKey:127.0.0.3, :127.0.0.1
: pool-1-thread-4,serviceKey:127.0.0.4, :127.0.0.2
: pool-1-thread-1,serviceKey:127.0.0.1, :127.0.0.1
: pool-1-thread-14,serviceKey:127.0.0.14, :127.0.0.1
: pool-1-thread-6,serviceKey:127.0.0.6, :127.0.0.1
: pool-1-thread-13,serviceKey:127.0.0.13, :127.0.0.2
: pool-1-thread-7,serviceKey:127.0.0.7, :127.0.0.2
: pool-1-thread-8,serviceKey:127.0.0.8, :127.0.0.2
: pool-1-thread-11,serviceKey:127.0.0.11, :127.0.0.2
: pool-1-thread-15,serviceKey:127.0.0.15, :127.0.0.1
: pool-1-thread-12,serviceKey:127.0.0.12, :127.0.0.1
//////////////////////////////////////////////////////////////////
: pool-1-thread-1,serviceKey:127.0.0.1, :127.0.0.1
: pool-1-thread-2,serviceKey:127.0.0.2, :127.0.0.2
: pool-1-thread-10,serviceKey:127.0.0.10, :127.0.0.1
: pool-1-thread-5,serviceKey:127.0.0.5, :127.0.0.3
: pool-1-thread-14,serviceKey:127.0.0.14, :127.0.0.1
: pool-1-thread-6,serviceKey:127.0.0.6, :127.0.0.1
: pool-1-thread-3,serviceKey:127.0.0.3, :127.0.0.1
: pool-1-thread-7,serviceKey:127.0.0.7, :127.0.0.2
: pool-1-thread-11,serviceKey:127.0.0.11, :127.0.0.2
: pool-1-thread-15,serviceKey:127.0.0.15, :127.0.0.1
: pool-1-thread-9,serviceKey:127.0.0.9, :127.0.0.3
: pool-1-thread-13,serviceKey:127.0.0.13, :127.0.0.2
: pool-1-thread-4,serviceKey:127.0.0.4, :127.0.0.2
: pool-1-thread-8,serviceKey:127.0.0.8, :127.0.0.2
: pool-1-thread-12,serviceKey:127.0.0.12, :127.0.0.1
同じリクエストがhashを経て見つかった処理リクエストのサーバが同じであることがわかる.
2.5重み付けポーリングは、パフォーマンスが高く、負荷が低いサーバに対して、より多くのリクエストを処理するために、より高い重み値を与えることができます.
package com.travelsky.pss.react.cn.fzjh.loadBalance.rule;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.travelsky.pss.react.cn.fzjh.abstrac.AbstractLoadBalance;
import com.travelsky.pss.react.cn.fzjh.rule.DynamicUploadRule;
import com.travelsky.pss.react.cn.fzjh.vo.ServiceInstance;
//
@Service("weightedRoundRoBinLoadBalance")
public class WeightedRoundRoBinLoadBalance extends AbstractLoadBalance {
protected final Logger logger = LoggerFactory
.getLogger(WeightedRoundRoBinLoadBalance.class);
@Resource(name="dynamicUploadRule")
private DynamicUploadRule dynamicUploadRule;
//
private int gcd;
//
private int max;
//
private int cycle;
//
private int currentIndex = -1;
/**
* , , ,AtomicReference
*/
private AtomicReference<List<String>> WRRList = new AtomicReference<List<String>>();
@Override
public void init() {
super.init();
buildWRRList();
}
@Override
public ServiceInstance chooseServerInstance() {
if (!isNotEmpty(WRRList.get())) {
logger.info(" , ");
return super.chooseServerInstance();
}
ServiceInstance serviceInstance = null;
synchronized (this) {
int index = 0;
while (index < cycle && null == serviceInstance) {
currentIndex = (currentIndex + 1) % WRRList.get().size();
String serviceName = WRRList.get().get(currentIndex);
serviceInstance = getServiceInstanceByServiceName(serviceName);
if (null == serviceInstance || serviceInstance.isIsolated()) {
index++;
}
}
}
return serviceInstance;
}
/**
*
*/
private void buildWRRList() {
boolean isGetSucc = false;
if (!getServiceInstanceList().isEmpty()) {
logger.info(" , ");
isGetSucc = calcLoadFactors();
}
if (isGetSucc) {
// server
int total = getServiceInstanceList().size();
//
int i = -1;
//
int cw = 0;
List<String> newWrrList = new ArrayList<String>(total);
// , , , , ;
// , , ;
// , , 。
for (int j = 0; j < cycle; j++) {
while (true) {
i = (i + 1) % total;//
if (i == 0) {
cw = cw - gcd;//
if (cw <= 0) {
cw = max;
//
if (cw == 0) {
newWrrList.add(null);
break;
}
}
}
ServiceInstance serviceInstance = getServiceInstanceList()
.get(i);
String serverName = serviceInstance.getServerName();
// server servername.
if(serviceInstance.getQzValue() >= cw){
newWrrList.add(serverName);
break;
}
}
}
WRRList.set(newWrrList);
}
}
/**
* , ,
* @return
*/
private boolean calcLoadFactors() {
// , ( , DB )
/**
* 1: 2:
*/
//
List<Integer> factors = getDefault();
if (null == factors || factors.size() == 0) {
return false;
}
// eg:10,20 10
gcd = calcMaxGCD(factors);
max = calcMaxValue(factors);
cycle = calcCycle(factors, gcd);
return true;
}
/**
* , /
* eg:100/100 + 200/100 + 300/100 = 6
* @param factors
* @param gcd
* @return
*/
private int calcCycle(List<Integer> factors, int gcd) {
int cycle = 0;
for (int i = 0; i < factors.size(); i++) {
cycle += factors.get(i) / gcd;
}
return cycle;
}
/**
*
*
* @param factors
* @return
*/
private int calcMaxValue(List<Integer> factors) {
int max = 0;
for (int i = 0; i < factors.size(); i++) {
if (factors.get(i) > max) {
max = factors.get(i);
}
}
return max;
}
/**
*
* @param factors
* @return
*/
private int calcMaxGCD(List<Integer> factors) {
int max = 0;
for (int i = 0; i < factors.size(); i++) {
if (factors.get(i) > 0) {
max = divisor(factors.get(i), max);
}
}
return max;
}
/**
*
* @param m
* @param n
* @return int
*/
private int divisor(int m, int n) {
if (m < n) {
int temp;
temp = m;
m = n;
n = temp;
}
if (0 == n) {
return m;
}
return divisor(m - n, n);
}
/**
*
* @param serviceNames
* @return
*/
private List<Integer> getDefault() {
List<Integer> list = new ArrayList<Integer>();
List<ServiceInstance> instances = dynamicUploadRule.getServiceInstanceRule();
for (final ServiceInstance serviceInstance : instances) {
list.add(serviceInstance.getQzValue());
}
return list;
}
/**
*
* @param list
* @return
*/
private boolean isNotEmpty(List<String> list) {
return null != list && list.size() > 0;
}
}
2.6重み付けポーリングアルゴリズムによる効果
package com.travelsky.pss.react_cn_fzjh;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.travelsky.pss.react.cn.fzjh.loadBalance.rule.WeightedRoundRoBinLoadBalance;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class TestWeightRoundRoBinLoadBalance {
@Autowired
private transient WeightedRoundRoBinLoadBalance weightedRoundRoBinLoadBalance;
@Test
public void testWeightedRoundRoBinLoadBalance(){
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(15, 20, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));
for(int i=1;i<=15;i++){
poolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(" : " + Thread.currentThread().getName() + " :" + weightedRoundRoBinLoadBalance.chooseServerInstance().getServerName());
}
});
}
}
}
/////////////////////////////////////// /////////////////////////////////////////
: pool-1-thread-2 :127.0.0.2
: pool-1-thread-1 :127.0.0.2
: pool-1-thread-5 :127.0.0.3
: pool-1-thread-6 :127.0.0.1
: pool-1-thread-3 :127.0.0.2
: pool-1-thread-9 :127.0.0.3
: pool-1-thread-10 :127.0.0.2
: pool-1-thread-13 :127.0.0.2
: pool-1-thread-11 :127.0.0.1
: pool-1-thread-7 :127.0.0.3
: pool-1-thread-15 :127.0.0.2
: pool-1-thread-4 :127.0.0.3
: pool-1-thread-8 :127.0.0.2
: pool-1-thread-12 :127.0.0.2
ソースのダウンロード: