RedisクラスタモードでScanとpiplineをサポートしない解釈と解決策について

7826 ワード

最近、プログラムを書く過程で問題が発生しました.redisのkey値が文字列で始まるデータを削除する必要があります.資料を調べることでkeysを使用するとデッドロックが発生し、速度が遅くなる可能性があるため、様々な資料を調べることでscanを使用することが望ましいが、実際の開発過程で単一ノードのredisがscanをサポートしていることが分かったが、クラスタ環境ではscanコマンドはサポートされていない(本人はredistemplate、JedisはRedis公式推奨のJava向け操作Redisのクライアントであり、RedisTemplateはSpringDataRedisにおけるJedisApiの高度パッケージである.SpringDataRedisはJedisに対してRedisのJavaクライアントを容易に交換することができ、Jedisより接続プールの特性を自動管理し、他のSpringフレームワークとの組み合わせに便利である.例えば:SpringCache)は、それだけでなく、以前よく知られていたPiplineパイプ操作もクラスタ環境をサポートしていない.様々なネット上のブログやredisの公式サイトをめくることで、javaにおけるredisクラスタ環境でのscanとpiplineの使用を簡単に実現した.なぜこの2つの操作はredisクラスタ環境ではサポートされていないのか.単一ノードの場合redisが16384スロットしかない場合、piplineを使用して1回の接続のみを確立し、ループ操作を行うとまずバッファにデータを入れ、1回にデータを送信して取得するので、取得ごとに1回の接続を確立するよりもはるかに性能が高いに違いないが、クラスタ化モードでは、複数のノードが16384スロットを共有し、クラスタに3つのノードがあると仮定すると、3つのノードが16384個のスロットビットに分割され、1つのデータを入れるとkeyがhashされ、hash以降の値に基づいて対応するスロットビットが見つかり、対応するノードが挿入されるが、piplineを使用すると問題が発見される.piplineは接続を確立するが、操作するkeyは異なるスライス上にある可能性があるため、このときリクエストの転送が必要ですが、これはpiplineの考え方に反しているため、クラスタモードではpiplineの操作はサポートされていません.scanは同じ理屈で、異なるkeyは異なるノードにある可能性がありますが、scanは1つのノード上のデータしか操作できません.対応するスロットを特定して分類します.同じpipeline内のすべてのキーが1つのノードに対応していることを保証すればよいので、最後にpipelineで実行します.もちろんscanも同様です.またmgetとmsetもあります.しかしscanの解決策は、各ノードを取得して各ノードのscanのコマンドでkey値を探すことです.piplineとscanで使用されるコードについて実装:(1)Pipline:
public static List clusterPiplineGetAndSet(List keys) throws Exception {

    //  key      
    RedisSerializer keySerializer = redisTemplate.getKeySerializer();
    //   map redis    key  list value,   list            key 
    HashMap> nodeKeyMap = new HashMap<>(8);
    List result = new ArrayList<>(8);
    //         
    RedisClusterConnection redisClusterConnection = redisTemplate.getConnectionFactory().getClusterConnection();
    try {
        //      key   ,       
        Iterable redisClusterNodes = redisClusterConnection.clusterGetNodes();
        for (RedisClusterNode redisClusterNode : redisClusterNodes) {
            //          
            RedisClusterNode.SlotRange slotRange = redisClusterNode.getSlotRange();
            for (String key : keys) {
                //  redis key hash     key     
                int slot = JedisClusterCRC16.getSlot(key);
                if (slotRange.contains(slot)) {
                    List list = nodeKeyMap.get(redisClusterNode);
                    if (null == list) {
                        list = new ArrayList<>();
                        nodeKeyMap.putIfAbsent(redisClusterNode, list);
                    }
                    //    key    
                    list.add(key);
                }
            }
        }
        //         redis             map
        for (Map.Entry> clusterNodeListEntry : nodeKeyMap.entrySet()) {
            //    
            RedisClusterNode redisClusterNode = clusterNodeListEntry.getKey();
            //        JedisPool      jedis redistemplate            。
            JedisPool jedisPool = ((JedisCluster) redisClusterConnection.getNativeConnection()).getClusterNodes()
                    .get(new HostAndPort(redisClusterNode.getHost(), redisClusterNode.getPort()).toString());
            List nodeListEntryValue = clusterNodeListEntry.getValue();
            byte[][] arr = new byte[nodeListEntryValue.size()][];
            int count = 0;
            //  key  
            for (String nodeKey : nodeListEntryValue) {
                //                             byte   。
                arr[count++] = keySerializer.serialize(nodeKey);
            }
            //         jedis  
            Jedis jedis = jedisPool.getResource();
            List> responses = new ArrayList<>();
            try {
                //        pipline      。
                Pipeline pipeline = jedis.pipelined();
                //************************            pipline    set           ******************************
                
                // redis    
                for (String nodeKey : nodeListEntryValue) {
                    responses.add(pipeline.get(keySerializer.serialize(nodeKey)));
                }
                List objects = pipeline.syncAndReturnAll();
                result = objects;

                 // redis    
                /* for (String nodeKey : nodeListEntryValue) {
                     pipeline.set(keySerializer.serialize(nodeKey),valueSerializer.serialize("nnnn"));
                 }
                 pipeline.sync();*/
                
                //*************************************************************************************************************
                
                pipeline.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }

        }
    } finally {
        RedisConnectionUtils.releaseConnection(redisClusterConnection, redisTemplate.getConnectionFactory());
    }
    return result;
}

(2)Scan:
    public static List getRedisKeys(String matchKey) {

    List list = new ArrayList<>();
    RedisClusterConnection redisClusterConnection = redisTemplate.getConnectionFactory().getClusterConnection();
    //     edispool           pipline     ,         
    Map clusterNodes = ((JedisCluster) redisClusterConnection.getNativeConnection()).getClusterNodes();
    for (Map.Entry entry : clusterNodes.entrySet()) {
        //     jedis  
        Jedis jedis = entry.getValue().getResource();
        //       (       ,               ),                
        if (!jedis.info("replication").contains("role:slave")) {
            List keys = getScan(jedis, matchKey);
            if (keys.size() > 0) {
                Map> map = new HashMap<>(8);
                //           ,    
                for (String key : keys) {
                    // cluster     key     ,  key      slot ,    :JedisDataException:
                    int slot = JedisClusterCRC16.getSlot(key);
                    //  slot key  ,  slot key    
                    if (map.containsKey(slot)) {
                        map.get(slot).add(key);
                    } else {
                        List list1 = new ArrayList();
                        list1.add(key);
                        map.put(slot, list1);
                    }
                }
                for (Map.Entry> integerListEntry : map.entrySet()) {
                    list.addAll(integerListEntry.getValue());
                }
            }
        }
    }
    return list;
}

public static List getScan(Jedis redisService, String key) {
    
    List list = new ArrayList<>();
    //            
    ScanParams params = new ScanParams();
    params.match(key);
    //       ,              
    params.count(100);
    String cursor = "0";
    ScanResult scanResult = redisService.scan(cursor, params);

    //scan.getStringCursor()        0    ,        
    while (null != scanResult.getStringCursor()) {
        //       
        list.addAll(scanResult.getResult());
        if (! "0".equals( scanResult.getStringCursor())) {
            scanResult = redisService.scan(cursor, params);
        } else {
            break;
        }
    }
    return list;
}

以上はpiplineとscanのクラスタ環境での実現過程であり、これはオフラインのテスト過程の具体的なredistemplateにすぎず、業務上で自分で実現する必要があり、同時にいくつかの異常操作の安全操作も開発者が自分で実現する必要がある.