RedisコンシステンシHashクライアント実装
6746 ワード
RedisコンシステンシHashクライアント実装
Redisパーティションには、コンシステンシHashをサポートするクライアントを使用する実装方法があります.
Javaにはこのタイプのクライアントがないようなので、自分で実現したいと思います.
げんり
一致性hashの原理、私は簡単に述べて、具体的にここを見ることができますまず、私たちのN個のインスタンスをhash()によって周長2^32−1の長さのリングに散らす私のやり方は、このインスタンスの番号としてランダム文字列を直接各インスタンスに生成し、リング上の位置としてその番号のhashCode()を取得することである.我々のN個のインスタンスのN個の位置を[P 1,P 2,...,Pn] とする.毎回set 1個(key,value)の時、 は、keyをhash()によって同様にリング上の位置P に対応するものを得る. Pの後継例Px を探す. Pxインスタンス に値をセット
毎回keyをgetするたびに、 keyをhash()によりリングに対応する位置P を得る. Pの後継例Px を探す. Pxインスタンス取得値 インスタンスを追加する場合 第1点を参照して一つの位置Pm を得る. Pの後継例Px を探す. get Pxインスタンス上のすべてのデータは、Pxをクリアし、これらのデータを再set する.
インスタンスを削除する場合 削除するインスタンスのすべてのデータを、そのインスタンスの後続の実Px例に記入する.
例ダウンタイムの場合 は、レプリケーション機能がオンでない場合、インスタンスのデータは失われますが、クラスタ全体が使用可能です. 複製機能をオンにし、複製係数が2であると仮定すると、setごとに2つのインスタンス、すなわち現在ヒットしているインスタンスPxにデータを書き込むとともに、Pxの後継インスタンスPx+1にデータを書き込む.これにより、2つの不連続ノードのダウンタイムを許容することができる.しかし同時に書き込み効率は 低下.
後継例を定義する:[P 1,P 2,...,Pn]でPより大きいすべての集合の中で最も小さいPxを探し、Pがすでに最大であればPx=min{P 1,P 2,...,Pn}
低レイアウトコード(コピー機能なし)
まとめ以前はコンシステンシhashアルゴリズムが万能だと思っていたが、今日テストした後、ノードをリングの上に均一に散らすことは不可能であることが分かった.結局、インスタンスの個数は2^32に対して確かに小さすぎて、今のところこの問題をよりよく解決する方法が考えられていない.この問題はRedisClusterがコンシステンシアルゴリズムを使用しない原因ではないか. Kafkaの
Redisパーティションには、コンシステンシHashをサポートするクライアントを使用する実装方法があります.
Javaにはこのタイプのクライアントがないようなので、自分で実現したいと思います.
げんり
一致性hashの原理、私は簡単に述べて、具体的にここを見ることができます
後継例を定義する:[P 1,P 2,...,Pn]でPより大きいすべての集合の中で最も小さいPxを探し、Pがすでに最大であればPx=min{P 1,P 2,...,Pn}
低レイアウトコード(コピー機能なし)
public class ConsistentHashClient {
private TreeMap mapped;
public ConsistentHashClient(String... args) {
mapped = new TreeMap<>();
//
for (String host : args) {
String hostName = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Node node = new Node(hostName, port);
int index = mapToNode(node);
mapped.put(index, node);
}
}
public void set(String key, String value) {
Node redis = getTrulyNode(key);
log.info("set key = {} , value = {} into {}",key,value,redis.hostName+":"+redis.port);
redis.set(key, value);
}
public String get(String key) {
Node redis = getTrulyNode(key);
String value = redis.get(key);
log.info("get key = {} , value = {} get from {}",key, value, redis.hostName + ":" + redis.port);
return value;
}
public int addNode(Node newNode) {
log.info("start to addNode, hostName = {} , port = {} ",newNode.hostName,newNode.port);
int index = mapToNode(newNode);
if (mapped.get(index) != null) {
throw new IllegalArgumentException(" redis ");
} else {
mapped.put(index, newNode);
}
Node next = getNextNode(index);
if (next != null) {
Map cache = next.keys();
next.empty();
//
for (Map.Entry entry : cache.entrySet()) {
set(entry.getKey(), entry.getValue());
}
}
log.info("finish to addNode, hostName = {} , port = {} ",newNode.hostName,newNode.port);
return index;
}
public Node deleteNode(String hostName, int port) {
log.info("start to delete node, hostName = {},port = {}",hostName,port);
for (Map.Entry node : mapped.entrySet()) {
if (hostName.equals(node.getValue().hostName) && port == node.getValue().port) {
return deleteNode(node.getKey());
}
}
throw new IllegalArgumentException(" Ip ");
}
private Node getNextNode(int index) {
Map.Entry next = mapped.higherEntry(index);
if (next == null) {
next = mapped.higherEntry(Integer.MIN_VALUE);
}
if (next == null) throw new IllegalStateException(" Redis ");
return next.getValue();
}
private String info(){
StringBuilder sb = new StringBuilder("\r
**********************************\r
");
for(Map.Entry entry : mapped.entrySet()) {
sb.append(entry.getValue().info()).append("**********************************\r
");
}
return sb.toString();
}
private Node deleteNode(int index) {
Node cur = mapped.get(index);
if (cur != null) {
Node next = getNextNode(index);
if (next != null) {
for (Map.Entry entry : cur.keys().entrySet()) {
next.set(entry.getKey(), entry.getValue());
}
} else {
log.info("data all lost");
}
mapped.remove(index);
log.info("start to delete node, hostName = {},port = {}",cur.hostName,cur.port);
}
return cur;
}
private Node getTrulyNode(String key) {
int index = mapToNode(key);
return getNextNode(index - 1);
}
private int mapToNode(Object key) {
return key.hashCode();
}
private static class Node {
Node(String hostName, int port) {
this.hostName = hostName;
this.port = port;
cache = new Jedis(hostName, port);
this.no = UUID.randomUUID().toString();
}
private String hostName = "localhost";
private int port = 6379;
private String no;
private Jedis cache;
public void set(String key, String value) {
cache.set(key, value);
}
public String get(String key) {
return cache.get(key);
}
@Override
public int hashCode() {
return no.hashCode();
}
public Map keys() {
Map all = new HashMap<>();
Set keySet = cache.keys("*");
for (String key : keySet) {
all.put(key, cache.get(key));
}
return all;
}
public void empty() {
cache.flushDB();
}
public String info(){
return cache.info();
}
}
private static final String TEST_HOST = "172.18.100.75";
public static void main(String[] args) {
ConsistentHashClient client = new ConsistentHashClient(TEST_HOST+":6381",TEST_HOST + ":6379",TEST_HOST+":6380");
List keys = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
String key = UUID.randomUUID().toString();
keys.add(key);
client.set(key, key);
}
for (int i = 0; i < 1000; i++) {
if (!client.get(keys.get(i)).equals(keys.get(i))) throw new IllegalStateException(" ");
}
log.info(" {} , :{}",client.mapped.size(),client.info());
}
}
まとめ
replication-factor
を参照して、私は自分でこの複製機能を考え出して、今のところまだ実現していません.第一点は私に衝動がなくなったので、みんなは良い方法があって私に提供することができます[email protected]