ServiceStackの改造RedisはRedis 3をサポートする.xクラスタモード

19965 ワード

Redisはずっと使っていますが、現在のRedis 3.*バージョンが登場し、Redisクラスタがサポートされていることは、私たちの長い間のRedis拡張の難題を解決します.しかし、テストを通じて私は喜んで悲しんで、私はずっとServiceStackを使っています.RedisはRedisに関する操作を行い、既存のdllバージョンではRedis 3を満たすことができない.0の下のクラスタモードは、やっと今時間があったので、やることができます.
では、ServiceStackをどうするか.Redisサポートクラスタは、redis clusterの公式ドキュメントを読むことで答えを見つけることができますが、実は簡単です.
  • Redis 3を知る.0のクラスタモード
  • Hash Slotの計算方法を理解する
  • ServiceStackについて説明します.Redis
  • 切込み点を探す
  • コード実装
  • 一、Redis 3.0クラスタモード


    現在Redis 3.0主要モードは主従設計であり、主ノードのアルゴリズムに少数服従多数を採用し、各主ノードはN-1個のコピーを生成して主従複製を行い、そのうちの1つの主ノードが失敗すると、クラスタは自動的にそのうちの1つをノードから主に昇格してクラスタ全体の正常な動作を維持し続ける.

    二、Hash Slot


    Redis3.0はHash Slot上で16384個に固定設定.これらのslotは各ノードに平均的に分布する.
    次のようになります.
    Node A contains hash slots from 0 to 5500.
    Node B contains hash slots from 5501 to 11000.
    Node C contains hash slots from 11001 to 16384.
    指定キーを取得すると、Slotのアルゴリズムにより、操作キーがどのマスターノード内に存在するかを計算するが、Slot=CRC 16(KEY)%16384となる.
    key=aのように彼のslotは15495と計算された.
    ABCの3つのノードのslotsを比較することにより、このキーをノードCに書き込むべきであると断定する.

    三、ServiceStack.Redis


    クラス図の構造:
    上記のクラス図ではstringクライアントに含まれるこれらのインタフェースに注目し,クラス図からRedis操作のすべての機能をカプセル化し,インタフェースオブジェクトの形式で具体的な操作のコード記述を実現する.
    ソース分析を開きます.RedisClientクラスは操作の具体的な実装クラスで、partial classプロパティを使用して複数のファイルを使用してクラスをマージします.
    PooledRedisClientManagerクラスはRedisClientの管理クラスとしてClientの割り当てとフェイルオーバなどの管理操作を行う.
    少なくとも私はこの2つのクラスがこのように組み合わせて使われていると思っています.

    四、切り込み点


    どのように手をつけるかを探していると、前述の内容を見て、PooledRedisClientManagerオブジェクトに基づいてRedisClientを初期化し、Keys操作に基づいてRedisClientを取得するたびに、パラメータレスGetClient()をGetClient(key)に拡張してkeyのSlotに基づいて戻るRedisClientオブジェクトを選択することを想定することができます.これにより、毎回正しいノードアドレスで接続操作を行うことができる、同様に読み出し操作も同様である.
    構想手順:
  • はServiceStackである.Redisはクラスタモードの属性を増加し、クラスタモード
  • のオン/オフを決定する.
  • Cluster拡張クラスを追加し、いくつかのCluterモードの共通の方法をサポートする.このような同様のpartial class RedisClientは、Cluster Nodes情報を照会する機能、Hash Slot計算方法、ノード情報の実体属性クラスを提供する.
  • パラメータ付きGetClient(key)メソッドを追加する、keyのslotを計算することによって対応するRedisClientを返す.

  • そんなに簡単です!

    五、コード実現


    コードを修正すると同時に、既存のコード構造を破壊しないために、if elseを使用してコードドメインを分離し、私たちの不注意によって以前のプログラムに問題が発生することを防止します.もちろん、十分な把握があれば、この関数を直接書き直すこともできます.
    PooledRedisClientManagementの変更
    /// 
    /// wanglei add
    /// open redis 3.0 cluster mode
    /// 
    public bool? OpenCluster { get; set; }
    

    private void InitClient(RedisClient client)メソッドを探し、初期化判定を加える.
        private void InitClient(RedisClient client)
        {
            if (this.ConnectTimeout != null)
                client.ConnectTimeout = this.ConnectTimeout.Value;
            if (this.SocketSendTimeout.HasValue)
                client.SendTimeout = this.SocketSendTimeout.Value;
            if (this.SocketReceiveTimeout.HasValue)
                client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
            if (this.IdleTimeOutSecs.HasValue)
                client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
            if (this.ForceReconnectInIdle.HasValue)
                client.ForceReconnectInIdle = this.ForceReconnectInIdle.Value;
            else
                client.ForceReconnectInIdle = false;
            if (this.NamespacePrefix != null)
                client.NamespacePrefix = NamespacePrefix;
            if (Db != null && client.Db != Db) //Reset database to default if changed
                client.ChangeDb(Db.Value);
    
            //wanglei add
            //close the cluster mode default.
            if (OpenCluster.HasValue)
            {
                client.OpenCluster = this.OpenCluster.HasValue;
            }
            else
            {
                client.OpenCluster = false;
                this.OpenCluster = false;
            }
            //wanglei add
            if (this.OpenCluster.Value)
            {
                InitClusterInfo(client);
            }
        }
    
        public List ClusterNodes;//wanglei add
        private bool _loadClusterNodes = false;//wanglei add
    
        /// 
        /// init cluster info
        /// wanglei add
        /// 
        /// 
        public void InitClusterInfo(RedisClient client)
        {
            if (_loadClusterNodes) return;
            ClusterNodes = new List();
            string host = string.Format("{0}:{1}", client.Host, client.Port);
            string nodes = client.GetClusterInfo();
            using (var reader = new StringReader(nodes))
            {
                string line;
                while ((line = reader.ReadLine()) != null)
                {
                    if (string.IsNullOrWhiteSpace(line)) continue;
                    ClusterNodes.Add(new ClusterNode(line));
                }
            }
    
            client.FillSlot(ClusterNodes);
    
            //according cluster nodes to reset the read or write client
            //1. the slave node as read from the client
            //2. the master node as write from the client            
            lock (readClients)
            {
                readClients = new RedisClient[0];
                ReadOnlyHosts = ClusterNodes.Where(w => w.Slave == true).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
                    .ToRedisEndPoints();
            }
    
            lock (writeClients)
            {
                writeClients = new RedisClient[0];
                ReadWriteHosts = ClusterNodes.Where(w => w.Slave == false).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
                    .ToRedisEndPoints();
            }
                       
    
            _loadClusterNodes = true;
        }
    

    パラメータなしのプロシージャは、変更する関数に関連しています.
    //書込みクライアントの取得
    public IRedisClient GetClient()
    //ロールに書き込まれたクライアントを取得
    private RedisClient GetInActiveWriteClient()--この関数は、private RedisClient GetInActiveWriteClient(string key)によって変更する.
    //リードクライアントの取得
    public virtual IRedisClient GetReadOnlyClient()
    //リードロールを取得するクライアント
    private RedisClient GetInActiveReadClient()--この関数は、private RedisClient GetInActiveReadClient(string key)によって変更する.
    以上の4つの関数はManage類の主要な入口点であり、関数の再ロードを利用して彼らのために1つのパラメータkeyを拡張してslot計算をサポートする必要がある.
        /// 
        /// get a readwriteclient by key slot
        /// wanglei add
        /// 
        /// the redis keyword
        /// 
        public IRedisClient GetClient(string key)
        {
            lock (writeClients)
            {
                if (!OpenCluster.Value) //open cluster mode wanglei add
                {
                    AssertValidReadWritePool();
                }
    
                RedisClient inActiveClient;
                while ((inActiveClient = GetInActiveWriteClient(key)) == null)
                {
                    if (PoolTimeout.HasValue)
                    {
                        // wait for a connection, cry out if made to wait too long
                        if (!Monitor.Wait(writeClients, PoolTimeout.Value))
                            throw new TimeoutException(PoolTimeoutError);
                    }
                    else
                        Monitor.Wait(writeClients, RecheckPoolAfterMs);
                }
    
                WritePoolIndex++;
                inActiveClient.Active = true;
    
                InitClient(inActiveClient);
    
                return inActiveClient;
            }
        }
    
        /// 
        /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
        /// 
        /// 
        public IRedisClient GetClient()
        {   
            //              .
            return this.GetClient(string.Empty);
        }
    
        /// 
        /// Called within a lock
        /// wanglei add
        /// 
        /// 
        private RedisClient GetInActiveWriteClient(string key)
        {
            //if key is empty then will think not cluster mode (default).
            int slot = -1;
            if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
            {
                slot = RedisClient.HashSlot(key);
            }
            if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
            {
                ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave == false);
                if (clusterNode == null)
                {
                    return null;
                }
                RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f => 
                        f.Host == clusterNode.Host && f.Port == clusterNode.Port
                    );
                if (rePoint == null)
                {
                    return null;
                }
                return InitNewClient(rePoint);
            }
            else
            {
                var desiredIndex = WritePoolIndex % writeClients.Length;
                //this will loop through all hosts in readClients once even though there are 2 for loops
                //both loops are used to try to get the prefered host according to the round robin algorithm
                for (int x = 0; x < ReadWriteHosts.Count; x++)
                {
                    var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count;
                    RedisEndpoint nextHost = ReadWriteHosts[nextHostIndex];
                    for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
                    {
    
                        if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
                            return writeClients[i];
                        else if (writeClients[i] == null || writeClients[i].HadExceptions)
                        {
                            if (writeClients[i] != null)
                                writeClients[i].DisposeConnection();
    
                            var client = InitNewClient(nextHost);
                            writeClients[i] = client;
    
                            return client;
                        }
    
                    }
                }
            }
            return null;
        }
    
        /// 
        /// get a readonlyclient by key slot
        /// wanglei add
        /// 
        /// 
        /// 
        public IRedisClient GetReadOnlyClient(string key)
        {           
            lock (readClients)
            {
                if (!OpenCluster.Value) //wanglei add
                {
                    AssertValidReadOnlyPool();
                }
                RedisClient inActiveClient;
                while ((inActiveClient = GetInActiveReadClient(key)) == null)
                {
                    if (PoolTimeout.HasValue)
                    {
                        // wait for a connection, cry out if made to wait too long
                        if (!Monitor.Wait(readClients, PoolTimeout.Value))
                            throw new TimeoutException(PoolTimeoutError);
                    }
                    else
                        Monitor.Wait(readClients, RecheckPoolAfterMs);
                }
    
                ReadPoolIndex++;
                inActiveClient.Active = true;
                InitClient(inActiveClient);
                return inActiveClient;
            }
        }
    
    
    
        /// 
        /// Returns a ReadOnly client using the hosts defined in ReadOnlyHosts.
        /// 
        /// 
        public virtual IRedisClient GetReadOnlyClient()
        {
            return GetReadOnlyClient(string.Empty);//wanglei add
        }
    
    
    
        /// 
        /// According to the key value Called within a lock 
        /// wanglei add
        /// 
        /// 
        private RedisClient GetInActiveReadClient(string key)
        {
            //if key is empty then will think not cluster mode (default).
            int slot = -1;
            if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
            {
                slot = RedisClient.HashSlot(key);
            }
            if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
            {
                ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave==false);
                if (clusterNode == null)
                {
                    return null;
                }
                RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f =>
                        f.Host == clusterNode.Host && f.Port == clusterNode.Port
                    );
                if (rePoint == null)
                {
                    return null;
                }
                return InitNewClient(rePoint);
            }
            else
            {
                var desiredIndex = ReadPoolIndex % readClients.Length;
                //this will loop through all hosts in readClients once even though there are 2 for loops
                //both loops are used to try to get the prefered host according to the round robin algorithm
                for (int x = 0; x < ReadOnlyHosts.Count; x++)
                {
                    var nextHostIndex = (desiredIndex + x) % ReadOnlyHosts.Count;
                    var nextHost = ReadOnlyHosts[nextHostIndex];
                    for (var i = nextHostIndex; i < readClients.Length; i += ReadOnlyHosts.Count)
                    {
                        if (readClients[i] != null && !readClients[i].Active && !readClients[i].HadExceptions)
                            return readClients[i];
                        else if (readClients[i] == null || readClients[i].HadExceptions)
                        {
                            if (readClients[i] != null)
                                readClients[i].DisposeConnection();
    
                            var client = InitNewClient(nextHost);
                            readClients[i] = client;
    
                            return client;
                        }
                    }
                }
            }
            return null;
        }
    

    新規クラス
    /// 
    /// redis cluster client extension 
    /// wanglei add
    /// 
    public partial class RedisClient
        : IRedisClient
    {
        internal ClusterNode _clusterNode = null;
        internal const int NoSlot = -1;
        internal const int RedisClusterSlotCount = 16384;
    
        /// 
        /// Get Cluster Nodes Info
        /// wanglei add
        /// 
        internal string GetClusterInfo()
        {
            return SendExpectString(Commands.Cluster, "NODES".ToUtf8Bytes());
        }
    
        /// 
        /// Fill Slot Range To all
        /// 
        /// 
        internal void FillSlot(List clusterNodes)
        {
            List nodes = clusterNodes.Where(s => s.ParentNodeId != "-").ToList();
            foreach (var node in nodes)
            {
                ClusterNode n = clusterNodes.FirstOrDefault(f => f.NodeID == node.ParentNodeId);
                if(n != null)
                {
                    node.Slot1 = n.Slot1;
                    node.Slot2 = n.Slot2;
                }
            }
        }
    
        internal static unsafe int IndexOf(byte* ptr, byte value, int start, int end)
        {
            for (int offset = start; offset < end; offset++)
                if (ptr[offset] == value) return offset;
            return -1;
        }
       
        static readonly ushort[] crc16tab =
            {
                0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
                0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
                0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
                0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
                0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
                0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
                0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
                0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
                0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
                0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
                0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
                0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
                0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
                0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
                0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
                0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
                0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
                0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
                0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
                0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
                0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
                0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
                0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
                0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
                0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
                0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
                0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
                0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
                0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
                0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
                0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
                0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
            };
    
    
        /// 
        /// get slot number by key
        /// 
        /// 
        /// 
        internal static unsafe int HashSlot(string key)
        {
            if (string.IsNullOrEmpty(key)) return NoSlot;
            unchecked
            {
                var blob = System.Text.Encoding.UTF8.GetBytes(key);
                fixed (byte* ptr = blob)
                {
                    int offset = 0, count = blob.Length, start, end;
                    if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
                        && (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
                        && --end != start)
                    {
                        offset = start + 1;
                        count = end - start; // note we already subtracted one via --end
                    }
    
                    uint crc = 0;
                    for (int i = 0; i < count; i++)
                        crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ ptr[offset++]) & 0x00FF]) & 0x0000FFFF;
                    return (int)(crc % RedisClusterSlotCount);
                }
            }
        }
    }
    

    クライアントを取得するとき、ServiceStack.Redisの設計者はRedisClientをより良く取得するためにポーリングアルゴリズムを用いて各Client Pool内のオブジェクトを順次取得したが,クラスタモードに切り替えると単一例モードとは異なり,彼のkey slotはポーリングの経路を完全に阻害したので,コードに最小限の影響を及ぼすためにif elseを用いてコードを完全に切断した.
    読み取り専用クライアントを取得する場合、クラスタモードも一例とは異なり、メインノードに依存して目的を達成するので、私もReadWriteClientを使用して所望のClientオブジェクトを返す.
    すべてのコードの修正が完了しました.テストできます!

    六、テスト結果##


    テストコード:
    PooledRedisClientManager prcm = new PooledRedisClientManager(RedisConfig.ReadWriteHost, RedisConfig.ReadOnlyHost,
                                                new RedisClientManagerConfig
                                                {
                                                    MaxWritePoolSize = RedisConfig.MaxWritePoolSize,
                                                    MaxReadPoolSize = RedisConfig.MaxReadPoolSize,
                                                    AutoStart = RedisConfig.AutoStart
                                                });
    
    prcm.OpenCluster = true; //          .
    
    string key_tmp = "key";
    string value = "wanglei_test_value";
    //   
    for (int i = 1; i <= 10; i++)
    {
        string key = key_tmp + i.ToString();
        using (IRedisClient irc = prcm.GetClient(key))
        {
             irc.SetEntry(key, value+i.ToString());
        }
    }
    //   
    for (int i = 1; i <= 10; i++)
    {
        string key = key_tmp + i.ToString();
        using (IRedisClient irc = prcm.GetReadOnlyClient(key))
        {
             Console.WriteLine(irc.GetValue(key));
        }
    }
    

    書き込みデータの分散:

    七、いくつかの問題


    テストの結果、ほとんどのkey操作は現在完全に互換性があるRedis 3.0ですが、setのsunion操作のように、クラスタモードで使用すると、2つのkeyが同じノードでないとkey slotが見つからないという問題が発生し、元のredis-cli-cのクラスタモードにも同様の問題があるため、現在のところこのような問題は解決されていません.今後、redisの著者はこのような操作を改善するかどうか分かりません.
    もし何か問題があれば、タイムリーに交流することができて、技術の交流が好きです!