[.Net]TCPレベルのリバースエージェント、Socket接続プール、およびパケット解析器

28739 ワード

背景
最近とても忙しくて、ブログは久しぶりに更新しました.2010-2011年のいくつかを振り返った.Netプロジェクトコードは、初心者にとって参考になるかもしれないと思いますが、ここでshareしてみましょう.主な内容は次のとおりです.
  • TCP逆プロキシ
  • Socket接続プール
  • パケット解析
  • 逆プロキシ
    一般的なWeb逆エージェントはよく知られていますが、主にクライアントとサービス側の間にエージェントサーバを架設し、クライアントの要求をサービス側またはデータベースに転送し、結果をクライアントに返信します.主な特徴は次のとおりです.
    1、一部のデータベースI/Oが重すぎて、更新が頻繁でないデータ、あるいはファイル、ピクチャなどの静的データをキャッシュする.2、クライアント(パブリックネットワーク)とサービス側(windowsサービス、Webサービス、ファイルサービス)を隔離し、リバースエージェントサーバのIP、名前、host、ポートなどだけをパブリックネットワークに暴露する.3、第2のポイントに基づいて、それは軽量で、いつでも再起動可能であるべきで、これはサービス側自身が所在するサーバーの再起動の代価が高い或いは再起動に耐えられない条件の下で、極めて役に立つ.
    例えば、サービス側自体が大量のビジネスロジックを処理する必要があり、再計算(cpuとメモリの要求が高い)、再I/O(ディスクとネットワークの要求が高い)、またはハイブリッドタイプに関連する可能性がある.サービス側のマシンコストは高い.より強力なcpu、より大きな容量のメモリ、より速いディスクとネットワークが必要であるため、DDOSとCCの防御能力を考慮する必要がある場合、サービス側の機械コストは急激に上昇する.
    この場合、リバースエージェント技術を考慮して、リバースエージェントサーバ(例えば、阿xクラウドのクラウドホスト、5 Gハードガード付きであるが、SSD以外のクラウドディスクI/O能力が劣り、この場合、業務サービス側のホストマシンとしてはならないが、リバースエージェントサーバとしてもよい)としてリバースエージェント分散クラスタを構成するために、ハードガード付き、構成がサービス側よりはるかに低い安価なマシンを選択することができる.
    DDOS攻撃では、トラフィックがピークに集約されなければならず、ガードマシンを殺すことができないが、DDOS攻撃者が備えるトラフィック打圧マシンとネットワーク条件によっては、通常、逆エージェント分布クラスタを通じて、1台の逆エージェントが殺され、死亡またはブラックホールウィンドウは通常30分から数時間以内である.比較的余裕のある逆エージェント備蓄が保証され、クラスタ全体が戦死する前にブラックホールから飛び出して復活するエージェントがクラスタに再加入してクライアントにサービスを提供することができれば、対抗を形成することができる.備蓄が不足していても、少なくとも攻撃を受けたときの意思決定にもっと時間を稼ぐことができます.
    以上のように、リバースエージェント技術は、追加のネットワーク伝送時間を増加させることによって、多くのクライアントがサービス側と直接接続するのに備えられていない利点を得ることができる.
    通常のウェブアプリケーションサーバ、例えばnginxは、逆エージェント機能を提供することができる.
    しかし、tcpレベルの逆エージェントは少ない.
    当時のプロジェクトは、次のような基礎コンポーネントを使用する必要がありました.
    接続プール
    クライアントがtcpプロトコルと逆プロキシサーバ通信、例えば一般的なデスクトップクライアントを使用する場合、単一の長接続+非同期方式でプロキシサーバに接続することが考えられる.
    一方、複数のエージェントサーバと真のビジネス・サービス・エンドの間には、エージェントとサービス・エンドの間で同期通信が多いため、効率的に、マルチ接続+プール化の技術を使用して、接続を長、短の間にし、両者の長所を総合することが考えられます.
    次に、プロジェクトで実際に使用された接続プールコードを示します.実装では、当時のMongoDBのC#ドライバ部分を参照しています.
        /// <summary>
        ///    
        ///      :
        /// 1:          ,      
        /// 2:        ,        
        ///     :           ,      
        ///     :     200   ,      .
        /// 5:              ,  :           
        /// 6:             ,  :           
        /// 7:                               
        /// </summary>
        public class SessionPool
        {
            private object _poolLock = new object();
            public int PoolSize { get; set; }
    
            public IList<SyncTcpSession> AvaliableSessions
            {
                get { return _avaliableSessions; }
            }
    
            public ILog Logger;
            private int _waitQueueSize;
            private bool _inMaintainPoolSize;
            private bool _inEnsureMinConnectionPoolSizeWorkItem;
            private IList<SyncTcpSession> _avaliableSessions = new List<SyncTcpSession>();
            public int MaxWaitQueueSize { get; set; }
            public int MaxConnectionPoolSize { get; set; }
            public int MinConnectionPoolSize { get; set; }
    
            public TimeSpan WaitQueueTimeout { get; set; }
            /// <summary>
            ///         ( )
            /// </summary>
            public TimeSpan MaxConnectionLifeTime { get; set; }
            /// <summary>
            ///         ( )
            /// </summary>
            public TimeSpan MaxConnectionIdleTime { get; set; }
            public IPEndPoint RemoteAddr { get; set; }
    
            public SessionPool(ILog log)
            {
                Logger = log;
            }
    
            /// <summary>
            ///       
            /// </summary>
            /// <returns></returns>
            public SyncTcpSession GetAvaliableSession()
            {
                lock (_poolLock)
                {
                    //                
                    //             ,         
                    //                   
                    if (_waitQueueSize >= MaxWaitQueueSize)
                    {
                        var ex = new Exception("            !");
                        Logger.Error(ex.Message, ex);
                        return null;
                    }
                    _waitQueueSize += 1;
                    try
                    {
                        DateTime timeoutAt = DateTime.Now + WaitQueueTimeout;
                        while (true)
                        {
                            //     
                            if (_avaliableSessions.Count > 0)
                            {
                                //             
                                for (int i = _avaliableSessions.Count - 1; i >= 0; i--)
                                {
                                    if (_avaliableSessions[i].State == SessionState.Open)
                                    {
                                        var connection = _avaliableSessions[i];
                                        _avaliableSessions.RemoveAt(i);
                                        return connection;
                                    }
                                }
    
                                //             ,      
                                AvaliableSessions[0].Close();
                                AvaliableSessions.RemoveAt(0);
                                return new SyncTcpSession(this);
                            }
    
                            //     ,    
                            if (PoolSize < MaxConnectionPoolSize)
                            {
                                var connection = new SyncTcpSession(this);
                                PoolSize += 1;
                                return connection;
                            }
    
                            //               ,       .
                            var timeRemaining = timeoutAt - DateTime.Now;
                            if (timeRemaining > TimeSpan.Zero)
                            {
                                Monitor.Wait(_poolLock, timeRemaining);
                            }
                            else
                            {
                                //    ,             ,         ,                   
                                var ex = new TimeoutException("  SyncTcpSession   .");
                                Logger.Error(ex.Message, ex);
                            }
                        }
                    }
                    finally
                    {
                        _waitQueueSize -= 1;
                    }
                }
            }
    
            /// <summary>
            ///      
            /// </summary>
            public void Clear()
            {
                lock (_poolLock)
                {
                    foreach (var connection in AvaliableSessions)
                    {
                        connection.Close();
                    }
                    AvaliableSessions.Clear();
                    PoolSize = 0;
                    Monitor.Pulse(_poolLock);
                    Logger.Info("      .");
                }
            }
            /// <summary>
            ///           
            /// </summary>
            public void MaintainPoolSize()
            {
                if (_inMaintainPoolSize)
                {
                    return;
                }
    
                _inMaintainPoolSize = true;
                try
                {
                    IList<SyncTcpSession> connectionsToRemove = new List<SyncTcpSession>();
                    lock (_poolLock)
                    {
                        var now = DateTime.Now;
                        //   :         ,        ,             ,        
                        for (int i = AvaliableSessions.Count - 1; i >= 0; i--)
                        {
                            var connection = AvaliableSessions[i];
                            if (now > connection.CreatedAt + MaxConnectionLifeTime
                                || now > connection.LastUsedAt + MaxConnectionIdleTime
                                || connection.IsConnected() == false)
                            {
                                //      、           
                                //      
                                connectionsToRemove.Add(connection);
                                //         
                                AvaliableSessions.RemoveAt(i);
                            }
                        }
                        // }
                    }
    
                    //     
                    if (connectionsToRemove.Any())
                    {
                        int i = 0;
                        foreach (var connToRemove in connectionsToRemove)
                        {
                            i++;
                            RemoveConnection(connToRemove);
                        }
                        Logger.InfoFormat("      :  {0}.", i);
                    }
    
                    if (PoolSize < MinConnectionPoolSize)
                    {
                        ThreadPool.QueueUserWorkItem(EnsureMinConnectionPoolSizeWorkItem,null);
                    }
                }
                finally
                {
                    _inMaintainPoolSize = false;
                }
            }
    
            private void EnsureMinConnectionPoolSizeWorkItem(object state)
            {
                if (_inEnsureMinConnectionPoolSizeWorkItem)
                {
                    return;
                }
    
                _inEnsureMinConnectionPoolSizeWorkItem = true;
                try
                {
                    while (true)
                    {
                        lock (_poolLock)
                        {
                            if (PoolSize >= MinConnectionPoolSize)
                            {
                                return;
                            }
                        }
    
                        var connection = new SyncTcpSession(this);
                        try
                        {
                            var added = false;
                            lock (_poolLock)
                            {
                                if (PoolSize < MaxConnectionPoolSize)
                                {
                                    AvaliableSessions.Add(connection);
                                    PoolSize++;
                                    added = true;
                                    Monitor.Pulse(_poolLock);
                                }
                            }
    
                            if (!added)
                            {
                                connection.Close();
                            }
                        }
                        catch
                        {
                            Thread.Sleep(TimeSpan.FromSeconds(1));
                        }
                    }
                }
                catch
                {
                }
                finally
                {
                    _inEnsureMinConnectionPoolSizeWorkItem = false;
                }
            }
    
            /// <summary>
            ///     
            /// </summary>
            /// <param name="connection"></param>
            public void ReleaseConnection(SyncTcpSession connection)
            {
                //       ,       
                // RemoveConnection(connection);
                // return;
                if (connection == null)
                    return;
                if (connection.SessionPool != this)
                {
                    connection.Close();
                    Logger.Info("         .");
                }
    
                if (connection.State != SessionState.Open)
                {
                    RemoveConnection(connection);
                    Logger.Info("    :     .");
                    return;
                }
    
                if (DateTime.Now - connection.CreatedAt > MaxConnectionLifeTime)
                {
                    RemoveConnection(connection);
                    Logger.Info("    :        .");
                    return;
                }
    
                lock (_poolLock)
                {
                    connection.LastUsedAt = DateTime.Now;
                    AvaliableSessions.Add(connection);
                    Monitor.Pulse(_poolLock);
                }
            }
    
            /// <summary>
            ///        
            /// </summary>
            /// <param name="connection"></param>
            private void RemoveConnection(SyncTcpSession connection)
            {
                lock (_poolLock)
                {
                        AvaliableSessions.Remove(connection); 
                        PoolSize -= 1;
                        Monitor.Pulse(_poolLock);
                }
    
                connection.Close();
            }

    上記のコードでは、使用される同期接続SynctcpSessionは、主にサーバ間の同期通信に使用され、その保証特性は以下のとおりです.
  • は、接続プールに関連付けられ、管理されます.
  • はステータスがなく、ネットワークが失敗すると接続をクリーンアップし、タイムアウトと再送はクライアントによって処理される.
  • は、同期通信能力を提供する.
  • の導入時とビジネスサーバ間のネットワークはイントラネットであるため,パケットの解析を簡素化する.
  • bufferプールを使用せず、MemoryStreamオブジェクトのみを使用し、必要に応じてオブジェクトを構築する.これは、エージェント・タスクに基づいてそれほど重くないことを考慮し、転送を主とし、キャッシュを補助とし、複数のクラスタを構成し、全体のメモリが相対的に余裕を持っている(これに比べて、ビジネス・サービス側はbufferプールを採用している).

  • コードは次のとおりです.
        /// <summary>
        ///       
        /// </summary>
        public class SyncTcpSession
        {
            private readonly object _instanceLock = new object();
            private readonly ILog _logger;
    
            private readonly IPEndPoint _remoteAddr;
            private readonly SessionPool _sessionPool;
            private Socket _socket;
    
            private SessionState _state = SessionState.Initial;
    
            public SessionPool SessionPool { get { return _sessionPool; } }
    
            public SyncTcpSession(SessionPool pool)
            {
                _logger = pool.Logger;
                _sessionPool = pool;
                _remoteAddr = _sessionPool.RemoteAddr;
                CreatedAt = DateTime.Now;
                _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
                              {SendTimeout = 90000, ReceiveTimeout = 180000};
            }
    
            public DateTime CreatedAt { get; set; }
    
            public DateTime LastUsedAt { get; set; }
    
            public SessionState State { get { return _state; } }
    
            public void Connect()
            {
                _socket.Connect(_remoteAddr);
                LastUsedAt = DateTime.Now;
                _state = SessionState.Open;
            }
    
            public void Close()
            {
                lock (_instanceLock)
                {
                    try
                    {
                        if (_socket == null || _state == SessionState.Closed)
                            return;
                        if (_socket.Connected)
                        {
                            try
                            {
                                _socket.Shutdown(SocketShutdown.Both);
                            }
                            finally
                            {
                                _socket.Close();
                                _socket.Dispose();
                                _socket = null;
                            }
                            _state = SessionState.Closed;
                        }
                    }
                    catch
                    {
                    }
                  //  _logger.Info("session is closed");
                }
            }
    
            /// <summary>
            ///         
            /// </summary>
            /// <returns></returns>
            public bool IsConnected()
            {
                if(_state!=SessionState.Open)
                {
                    return false;
                }
    
                byte[] tmp = new byte[1];
                try
                {
                    _socket.Blocking = false;
                    _socket.Send(tmp, 0, 0);
                    _socket.Blocking = true;
                    return true;
                }catch(SocketException ex)
                {
                    _logger.Error("[Not Connected]");
                    return false;
                }
            }
    
            public bool Send(byte[] sentBytes)
            {
                lock (_instanceLock)
                {
                    LastUsedAt = DateTime.Now;
                    if (_state == SessionState.Initial)
                    {
                        //        ,    
                        if(!Open())
                        {
                            _state = SessionState.Closed;
                            return false;
                        }
                    }
    
                    //        ,    
                    if(!IsConnected())
                    {
                        _state = SessionState.Closed;
                        return false;
                    }
    
                    //           
                    int allLen = sentBytes.Length;
                    try
                    {
                        while (allLen > 0)
                        {
                            //        ,           ,       
                            int sent = _socket.Send(sentBytes, 0, sentBytes.Length, SocketFlags.None);
                            allLen -= sent;
                        }
    
                        LastUsedAt = DateTime.Now;
                        return true;
                    }
                    catch (SocketException ex)
                    {
                        //      ,    
                        _logger.Error(string.Format("[Send Failed] {0}", ex.Message), ex);
                        _state = SessionState.Closed;
                        return false;
                    }
                }
            }
    
            public byte[] Receive(out bool successful)
            {
                successful = false;
                const int headerLen = 8;
                byte[] ret = null;
                bool foundHeader = false;
                LastUsedAt = DateTime.Now;
                if (_socket == null || !_socket.Connected) return null;
                lock (_instanceLock)
                {
                    //     ,      ,     
                    var buffer = new byte[16*1024];
                    int remaining = -1;
                    using (var ms = new MemoryStream())
                    {
                        try
                        {
                            while (remaining != 0)
                            {
                                int allLen = _socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
                                if (!foundHeader)
                                {
                                    if (allLen >= headerLen)
                                    {
                                        foundHeader = true;
                                        int bodyLen = (buffer[4] << 24) + (buffer[5] << 16) + (buffer[6] << 8) +
                                                      buffer[7];
                                        remaining = (int) (headerLen + bodyLen - ms.Length);
                                    }
                                }
                                ms.Write(buffer, 0, allLen);
                                if (foundHeader)
                                {
                                    remaining -= allLen;
                                }
                            }
    
                            ret = new byte[ms.Length];
                            ms.Position = 0;
                            ms.Read(ret, 0, ret.Length);
    
                            LastUsedAt = DateTime.Now;
                            successful = true;
                            return ret;
                        }
                        catch (Exception ex)
                        {
                            successful = false;
                            _state = SessionState.Closed;
                            _logger.Error(string.Format("[Recv Failed] {0}", ex.Message), ex);
                        }
                    }
                }
    
                return ret;
            }
    
            public bool Open()
            {
                try
                {
                    Connect();
                    return true;
                }
                catch (Exception ex)
                {
                    _state = SessionState.Closed;
                    _logger.Error(string.Format("[Open Failed] {0}", ex.Message), ex);
                    return false;
                }
            }
    
            public bool SendRequest(byte[] package)
            {
               return Send(package);
            }
    
            public byte[] GetBody(byte[] data)
            {
                if (data == null || data.Length < 1)
                {
                    return null;
                }
    
                if (data.Length < 8)
                {
                    _logger.Error("           ");
                    return null;
                }
    
                int bodyLen = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) +
                              data[7];
                var body = new byte[bodyLen];
                if (bodyLen + 8 != data.Length)
                {
                    _logger.ErrorFormat("    :totalLen:({0}),bodyLen:{1}", data.Length, body.Length);
                    return null;
                }
                Buffer.BlockCopy(data, 8, body, 0, bodyLen);
                return body;
            }
        }

    パケット解析
    スライスしてデータを送信する能力を提供し、同時に粘着問題を避けるために、パケット解析を行う必要があります.完備版のパケット解析コードは以下の特性を提供します.
  • は、受信したバイトストリームが完全なパケットを含むかどうかを区別することができ、もし、イベントが投げ出され、そうでなければ、データを受信し続けることができる.
  • は、パケットヘッダが完全であるかどうかを検出することができる.
  • は、パッケージが完全であるかどうかを検出することができる.
  • ネットワークの品質が悪い場合(国外で国内、非ローカルエリアネットワーク、電気通信、連通)はこの解析器を使用し、クライアントとエージェント間で通信し、この解析器を採用する.

  • コードは次のとおりです.
        /// <summary>
        ///     
        /// </summary>
        public class PackageAnalyzer
        {
            public PackageAnalyzer(int headerLen)
            {
                _headerLen = headerLen;
                _header = new byte[_headerLen];
            }
    
            /// <summary>
            ///     
            /// </summary>
            private readonly int _headerLen;
    
            /// <summary>
            ///     
            /// </summary>
            private readonly byte[] _header;
    
            /// <summary>
            ///              
            /// </summary>
            private int _requiredDataLength;
    
            /// <summary>
            ///                   
            /// </summary>
            private int _receivedHeaderLength;
    
            /// <summary>
            ///       
            /// </summary>
            private Header _headerFlag;
    
            /// <summary>
            ///     
            /// </summary>
            private enum Header
            {
                NotFound, Found, PartialFound
            }
    
            /// <summary>
            ///        
            /// </summary>
            private bool _headerWritten;
    
            /// <summary>
            ///     
            /// </summary>
            public BufferWriter Writer { get; set; }
    
            /// <summary>
            ///         
            /// </summary>
            public bool EnabledVariant { get; set; }
    
            /// <summary>
            ///            
            /// </summary>
            /// <param name="requestInfo"></param>
            public delegate void OnAnalyzeSuccess(BinaryResponseInfo requestInfo);
    
            /// <summary>
            ///         ,      
            /// </summary>
            /// <param name="data">   </param>
            /// <param name="offset">  </param>
            /// <param name="total">    </param>
            /// <param name="onAnalyzeSuccessCallback">       </param>
            public void TryProcess(byte[] data, int offset, int total, OnAnalyzeSuccess onAnalyzeSuccessCallback)
            {
                while (total > 0)
                {
                    //       
                    if (_headerFlag == Header.NotFound)
                    {
                        //  
                        if (total >= _headerLen)
                        {
                            //      
                            _headerFlag = Header.Found;
                            Array.Copy(data, offset, _header, 0, _headerLen);
                            offset += _headerLen;
                            total -= _headerLen;
    
                            //      
                            _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7];
                            _receivedHeaderLength = 0;
                            //    
                        } //  
                        else
                        {
                            Array.Copy(data, offset, _header, 0, total);
                            _receivedHeaderLength += total;
                            _headerFlag = Header.PartialFound;
                            break;
                        }
                    }
                    //     
                    if (_headerFlag == Header.Found)
                    {
                        //        
                        if (total >= _requiredDataLength)
                        {
                            //    
                            //      
                            if (!_headerWritten)
                            {
                                Writer = new BufferWriter(System.Text.Encoding.UTF8);
                                _headerWritten = true;
                                Writer.Write(_header, 0, _headerLen);
                            }
                            Writer.Write(data, offset, _requiredDataLength);
    
                            offset += _requiredDataLength;
                            total -= _requiredDataLength;
    
                            //      
                            _requiredDataLength = 0;
                            _receivedHeaderLength = 0;
                            _headerFlag = Header.NotFound;
                            _headerWritten = false;
    
                            //       ,      
                            //////////////////////////////////////////////////////
    
                            var reader = new BufferReader(System.Text.Encoding.UTF8, Writer) { EnabledVariant = EnabledVariant };
    
                            BinaryResponseInfo responseInfo;
                            MessageRead(reader, out responseInfo);
                            if (responseInfo != null)
                                if (onAnalyzeSuccessCallback != null)
                                    onAnalyzeSuccessCallback(responseInfo);
    
                            //////////////////////////////////////////////////////
    
                        }
                        //        
                        else
                        {
                            //    
                            //      
                            if (!_headerWritten)
                            {
                                Writer = new BufferWriter(System.Text.Encoding.UTF8);
                                _headerWritten = true;
                                Writer.Write(_header, 0, _headerLen);
                            }
                            //    
                            Writer.Write(data, offset, total);
                            _requiredDataLength -= total;
                            break;
                        }
                    }
    
                    //      
                    if (_headerFlag == Header.PartialFound)
                    {
                        //        
                        if (total + _receivedHeaderLength < _headerLen)
                        {
                            Array.Copy(data, offset, _header, _receivedHeaderLength, total);
                            _receivedHeaderLength += total;
                            break;
                        }
                        //        
                        else
                        {
                            _headerFlag = Header.Found;
                            var delta = _headerLen - _receivedHeaderLength;
                            Array.Copy(data, offset, _header, _receivedHeaderLength, delta);
    
                            total -= delta;
                            offset += delta;
    
                            //      
                            _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7];
                            _receivedHeaderLength = 0;
                            //    
                        }
                    }
                }
    
            }
    
    
            /// <summary>
            ///          
            /// </summary>
            /// <param name="buffer">  </param>
            /// <param name="offset">  </param>
            /// <param name="length">  </param>
            /// <returns></returns>
            public virtual int GetDataLengthFromHeader(byte[] buffer, int offset, int length)
            {
                // 5、6、7、8       
                return (buffer[offset + 4] << 24) + (buffer[offset + 5] << 16) + (buffer[offset + 6] << 8) +
                       (buffer[offset + 7]);
            }
    
            /// <summary>
            ///       
            /// </summary>
            /// <param name="reader">      </param>
            /// <param name="requestInfo">    </param>
            private void MessageRead(BufferReader reader, out BinaryResponseInfo requestInfo)
            {
                var package = reader.ToBytes();
                //    (          )
                var serviceKey = System.Text.Encoding.UTF8.GetString(package, 0, 4);
                var bodyLen = (package[4] << 24) + (package[5] << 16) + (package[6] << 8) + (package[7]);
    
                //  System.Diagnostics.Debug.Assert(bodyLen > 0, "     ");
                var body = new byte[bodyLen];
                Buffer.BlockCopy(package, _headerLen, body, 0, bodyLen);
                requestInfo = new BinaryResponseInfo() { Key = serviceKey, Body = body };
            }
        }

    歴史的な理由から、いくつかのネーミングは適切ではなく、コード構造もよくありません.例えば、上記のクラスはParserと呼ばれているほうが適切かもしれません.
    締めくくり
    ネットプログラミングを初めて勉強した友达に役に立つことを望んでいます.