Redis cluster使用pipeline
4581 ワード
一般的な解決の考え方
redisクラスタには16384個のslotがあり、例えば3個のノードがあり、各ノードに割り当てられる可能性のあるslotはノードAが0−5500であり、ノードBが5501−11000であり、ノードCが11001−16383である.pipelineはノードに基づいているので、pipelineでいくつかのkeyの値をクエリーするには、
ソリューション
前節で述べた手順は,実際には
Demo
redisクラスタには16384個のslotがあり、例えば3個のノードがあり、各ノードに割り当てられる可能性のあるslotはノードAが0−5500であり、ノードBが5501−11000であり、ノードCが11001−16383である.pipelineはノードに基づいているので、pipelineでいくつかのkeyの値をクエリーするには、
JedisClusterCRC16.getSlot(key)
でkeyのslot値を計算し、上の各ノードのslot分布を通じて、どのkeyがどのノードにあるべきかを知る必要があります.このノードのJedisPoolを取得するとpipelineで読み書きができます.上記のプロセスを実装するには多くの方法があり、本稿ではコード量が最も少ない解決方法を紹介する.この文書はredis 3.2.9に基づいています(redisクラスタのインストール方法については、「」を参照してください.https://www.jianshu.com/p/64d05c4e0ae2)および
redis.clients
jedis
2.9.0
ソリューション
前節で述べた手順は,実際には
JedisClusterInfoCache
のオブジェクトの中で開発者の実現を支援したが,このオブジェクトはJedisClusterConnectionHandler
の中でprotected
で対外開放されておらず,JedisCluster
のAPIを介してもJedisClusterConnectionHandler
のオブジェクトを入手できなかった.したがって、これらのオブジェクトは、次の2つのクラスによって露出され、getJedisPoolFromSlot
を使用すると、keyごとに対応するJedisPoolが分かるようになります.class JedisClusterPlus extends JedisCluster {
public JedisClusterPlus(Set jedisClusterNode, int connectionTimeout, int soTimeout, final GenericObjectPoolConfig poolConfig) {
super(jedisClusterNode);
super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
connectionTimeout, soTimeout);
}
public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
}
}
public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler{
public JedisSlotAdvancedConnectionHandler(Set nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
super(nodes, poolConfig, connectionTimeout, soTimeout);
}
public JedisPool getJedisPoolFromSlot(int slot) {
JedisPool connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node
// assignment
return connectionPool;
} else {
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool;
} else {
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
}
}
}
}
Demo
public class Tester {
public static void main(String[] args) {
Set jedisClusterNode = new HashSet<>();
HostAndPort hostAndPort1 = new HostAndPort("hostA",7000);
HostAndPort hostAndPort2 = new HostAndPort("hostB",7001);
HostAndPort hostAndPort3 = new HostAndPort("hostC",7002);
jedisClusterNode.add(hostAndPort1);
jedisClusterNode.add(hostAndPort2);
jedisClusterNode.add(hostAndPort3);
JedisClusterPlus jedisClusterPlus = new JedisClusterPlus(jedisClusterNode, 2000, 2000, new JedisPoolConfig());
JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPlus.getConnectionHandler();
String[] testKeys = {"foo","bar","xyz"};
Map> poolKeys = new HashMap<>();
for (String key : testKeys) {
int slot = JedisClusterCRC16.getSlot(key);
JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
if (poolKeys.keySet().contains(jedisPool)){
List keys = poolKeys.get(jedisPool);
keys.add(key);
}else {
List keys = new ArrayList<>();
keys.add(key);
poolKeys.put(jedisPool, keys);
}
}
for (JedisPool jedisPool : poolKeys.keySet()) {
Jedis jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
List keys = poolKeys.get(jedisPool);
keys.forEach(key ->pipeline.get(key));
List result = pipeline.syncAndReturnAll();
System.out.println(result);
jedis.close();
}
}
}