[.Net実装]TCPレベルのリバースエージェント、ソケット接続プール、およびパケット解析器

57447 ワード

背景
最近とても忙しくて、ブログは久しぶりに更新しました.2010-2011年のいくつかを振り返った.Netプロジェクトコードは、初心者にとって参考になるかもしれないと思いますが、ここでshareしてみましょう.主な内容は次のとおりです.
  • TCP逆プロキシ
  • Socket接続プール
  • パケット解析
  • 逆プロキシ
    一般的なWeb逆エージェントはよく知られていますが、主にクライアントとサービス側の間にエージェントサーバを架設し、クライアントの要求をサービス側またはデータベースに転送し、結果をクライアントに返信します.主な特徴は次のとおりです.
    1、一部のデータベースI/Oが重すぎて、更新が頻繁でないデータ、あるいはファイル、ピクチャなどの静的データをキャッシュする.2、クライアント(パブリックネットワーク)とサービス側(windowsサービス、Webサービス、ファイルサービス)を隔離し、リバースエージェントサーバのIP、名前、host、ポートなどだけをパブリックネットワークに暴露する.3、第2のポイントに基づいて、それは軽量で、いつでも再起動可能であるべきで、これはサービス側自身が所在するサーバーの再起動の代価が高い或いは再起動に耐えられない条件の下で、極めて役に立つ.
    例えば、サービス側自体が大量のビジネスロジックを処理する必要があり、再計算(cpuとメモリの要求が高い)、再I/O(ディスクとネットワークの要求が高い)、またはハイブリッドタイプに関連する可能性がある.サービス側のマシンコストは高い.より強力なcpu、より大きな容量のメモリ、より速いディスクとネットワークが必要であるため、DDOSとCCの防御能力を考慮する必要がある場合、サービス側の機械コストは急激に上昇する.
    この場合、リバースエージェント技術を考慮して、リバースエージェントサーバ(例えば、阿xクラウドのクラウドホスト、5 Gハードガード付きであるが、SSD以外のクラウドディスクI/O能力が劣り、この場合、業務サービス側のホストマシンとしてはならないが、リバースエージェントサーバとしてもよい)としてリバースエージェント分散クラスタを構成するために、ハードガード付き、構成がサービス側よりはるかに低い安価なマシンを選択することができる.
    DDOS攻撃では、トラフィックがピークに集約されなければならず、ガードマシンを殺すことができないが、DDOS攻撃者が備えるトラフィック打圧マシンとネットワーク条件によっては、通常、逆エージェント分布クラスタを通じて、1台の逆エージェントが殺され、死亡またはブラックホールウィンドウは通常30分から数時間以内である.比較的余裕のある逆エージェント備蓄が保証され、クラスタ全体が戦死する前にブラックホールから飛び出して復活するエージェントがクラスタに再加入してクライアントにサービスを提供することができれば、対抗を形成することができる.備蓄が不足していても、少なくとも攻撃を受けたときの意思決定にもっと時間を稼ぐことができます.
    以上のように、リバースエージェント技術は、追加のネットワーク伝送時間を増加させることによって、多くのクライアントがサービス側と直接接続するのに備えられていない利点を得ることができる.
    通常のウェブアプリケーションサーバ、例えばnginxは、逆エージェント機能を提供することができる.
    しかし、tcpレベルの逆エージェントは少ない.
    当時のプロジェクトは、次のような基礎コンポーネントを使用する必要がありました.
    接続プール
    クライアントがtcpプロトコルと逆プロキシサーバ通信、例えば一般的なデスクトップクライアントを使用する場合、単一の長接続+非同期方式でプロキシサーバに接続することが考えられる.
    一方、複数のエージェントサーバと真のビジネス・サービス・エンドの間には、エージェントとサービス・エンドの間で同期通信が多いため、効率的に、マルチ接続+プール化の技術を使用して、接続を長、短の間にし、両者の長所を総合することが考えられます.
    次に、プロジェクトで実際に使用された接続プールコードを示します.実装では、当時のMongoDBのC#ドライバ部分を参照しています. /// /// /// /// 1: , /// 2: , /// : , /// : 200 , . /// 5: , : /// 6: , : /// 7: /// public class SessionPool { private object _poolLock = new object(); public int PoolSize { get; set; } public IList AvaliableSessions { get { return _avaliableSessions; } } public ILog Logger; private int _waitQueueSize; private bool _inMaintainPoolSize; private bool _inEnsureMinConnectionPoolSizeWorkItem; private IList _avaliableSessions = new List(); public int MaxWaitQueueSize { get; set; } public int MaxConnectionPoolSize { get; set; } public int MinConnectionPoolSize { get; set; } public TimeSpan WaitQueueTimeout { get; set; } /// /// ( ) /// public TimeSpan MaxConnectionLifeTime { get; set; } /// /// ( ) /// public TimeSpan MaxConnectionIdleTime { get; set; } public IPEndPoint RemoteAddr { get; set; } public SessionPool(ILog log) { Logger = log; } /// /// /// /// public SyncTcpSession GetAvaliableSession() { lock (_poolLock) { // // , // if (_waitQueueSize >= MaxWaitQueueSize) { var ex = new Exception(" !"); Logger.Error(ex.Message, ex); return null; } _waitQueueSize += 1; try { DateTime timeoutAt = DateTime.Now + WaitQueueTimeout; while (true) { // if (_avaliableSessions.Count > 0) { // for (int i = _avaliableSessions.Count - 1; i >= 0; i--) { if (_avaliableSessions[i].State == SessionState.Open) { var connection = _avaliableSessions[i]; _avaliableSessions.RemoveAt(i); return connection; } } // , AvaliableSessions[0].Close(); AvaliableSessions.RemoveAt(0); return new SyncTcpSession(this); } // , if (PoolSize < MaxConnectionPoolSize) { var connection = new SyncTcpSession(this); PoolSize += 1; return connection; } // , . var timeRemaining = timeoutAt - DateTime.Now; if (timeRemaining > TimeSpan.Zero) { Monitor.Wait(_poolLock, timeRemaining); } else { // , , , var ex = new TimeoutException(" SyncTcpSession ."); Logger.Error(ex.Message, ex); } } } finally { _waitQueueSize -= 1; } } } /// /// /// public void Clear() { lock (_poolLock) { foreach (var connection in AvaliableSessions) { connection.Close(); } AvaliableSessions.Clear(); PoolSize = 0; Monitor.Pulse(_poolLock); Logger.Info(" ."); } } /// /// /// public void MaintainPoolSize() { if (_inMaintainPoolSize) { return; } _inMaintainPoolSize = true; try { IList connectionsToRemove = new List(); lock (_poolLock) { var now = DateTime.Now; // : , , , for (int i = AvaliableSessions.Count - 1; i >= 0; i--) { var connection = AvaliableSessions[i]; if (now > connection.CreatedAt + MaxConnectionLifeTime || now > connection.LastUsedAt + MaxConnectionIdleTime || connection.IsConnected() == false) { // 、 // connectionsToRemove.Add(connection); // AvaliableSessions.RemoveAt(i); } } // } } // if (connectionsToRemove.Any()) { int i = 0; foreach (var connToRemove in connectionsToRemove) { i++; RemoveConnection(connToRemove); } Logger.InfoFormat(" : {0}.", i); } if (PoolSize < MinConnectionPoolSize) { ThreadPool.QueueUserWorkItem(EnsureMinConnectionPoolSizeWorkItem); } } finally { _inMaintainPoolSize = false; } } private void EnsureMinConnectionPoolSizeWorkItem(object state) { if (_inEnsureMinConnectionPoolSizeWorkItem) { return; } _inEnsureMinConnectionPoolSizeWorkItem = true; try { while (true) { lock (_poolLock) { if (PoolSize >= MinConnectionPoolSize) { return; } } var connection = new SyncTcpSession(this); try { var added = false; lock (_poolLock) { if (PoolSize < MaxConnectionPoolSize) { AvaliableSessions.Add(connection); PoolSize++; added = true; Monitor.Pulse(_poolLock); } } if (!added) { connection.Close(); } } catch { Thread.Sleep(TimeSpan.FromSeconds(1)); } } } catch { } finally { _inEnsureMinConnectionPoolSizeWorkItem = false; } } /// /// /// /// public void ReleaseConnection(SyncTcpSession connection) { // , // RemoveConnection(connection); // return; if (connection == null) return; if (connection.SessionPool != this) { connection.Close(); Logger.Info(" ."); } if (connection.State != SessionState.Open) { RemoveConnection(connection); Logger.Info(" : ."); return; } if (DateTime.Now - connection.CreatedAt > MaxConnectionLifeTime) { RemoveConnection(connection); Logger.Info(" : ."); return; } lock (_poolLock) { connection.LastUsedAt = DateTime.Now; AvaliableSessions.Add(connection); Monitor.Pulse(_poolLock); } } /// /// /// /// private void RemoveConnection(SyncTcpSession connection) { lock (_poolLock) { AvaliableSessions.Remove(connection); PoolSize -= 1; Monitor.Pulse(_poolLock); } connection.Close(); }

    , SyncTcpSession, , :

    1. , , 。
    2. , 。
    3. buffer , MemoryStream , , , , , , ( , buffer )。

        ///   
        ///       
        /// 
        public class SyncTcpSession
        {
            private readonly object _instanceLock = new object();
            private readonly ILog _logger;
    
            private readonly IPEndPoint _remoteAddr;
            private readonly SessionPool _sessionPool;
            private Socket _socket;
    
            private SessionState _state = SessionState.Initial;
    
            public SessionPool SessionPool { get { return _sessionPool; } }
    
            public SyncTcpSession(SessionPool pool)
            {
                _logger = pool.Logger;
                _sessionPool = pool;
                _remoteAddr = _sessionPool.RemoteAddr;
                CreatedAt = DateTime.Now;
                GenerationId = _sessionPool.GenerationId;
                _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
                              {SendTimeout = 90000, ReceiveTimeout = 180000};
            }
    
            public DateTime CreatedAt { get; set; }
    
            public DateTime LastUsedAt { get; set; }
    
            public SessionState State { get { return _state; } }
    
            public void Connect()
            {
                _socket.Connect(_remoteAddr);
                LastUsedAt = DateTime.Now;
                _state = SessionState.Open;
            }
    
            public void Close()
            {
                lock (_instanceLock)
                {
                    try
                    {
                        if (_socket == null || _state == SessionState.Closed)
                            return;
                        if (_socket.Connected)
                        {
                            try
                            {
                                _socket.Shutdown(SocketShutdown.Both);
                            }
                            finally
                            {
                                _socket.Close();
                                _socket.Dispose();
                                _socket = null;
                            }
                            _state = SessionState.Closed;
                        }
                    }
                    catch
                    {
                    }
                  //  _logger.Info("session is closed");
                }
            }
    
            ///   
            ///         
            /// 
            /// 
            public bool IsConnected()
            {
                if(_state!=SessionState.Open)
                {
                    return false;
                }
    
                byte[] tmp = new byte[1];
                try
                {
                    _socket.Blocking = false;
                    _socket.Send(tmp, 0, 0);
                    _socket.Blocking = true;
                    return true;
                }catch(SocketException ex)
                {
                    _logger.Error("[Not Connected]");
                    return false;
                }
            }
    
            public bool Send(byte[] sentBytes)
            {
                lock (_instanceLock)
                {
                    LastUsedAt = DateTime.Now;
                    if (_state == SessionState.Initial)
                    {
                        //        ,    
                        if(!Open())
                        {
                            _state = SessionState.Closed;
                            return false;
                        }
                    }
    
                    //        ,    
                    if(!IsConnected())
                    {
                        _state = SessionState.Closed;
                        return false;
                    }
    
                    //           
                    int allLen = sentBytes.Length;
                    try
                    {
                        while (allLen > 0)
                        {
                            //        ,           ,       
                            int sent = _socket.Send(sentBytes, 0, sentBytes.Length, SocketFlags.None);
                            allLen -= sent;
                        }
    
                        LastUsedAt = DateTime.Now;
                        return true;
                    }
                    catch (SocketException ex)
                    {
                        //      ,    
                        _logger.Error(string.Format("[Send Failed] {0}", ex.Message), ex);
                        _state = SessionState.Closed;
                        return false;
                    }
                }
            }
    
            public byte[] Receive(out bool successful)
            {
                successful = false;
                const int headerLen = 8;
                byte[] ret = null;
                bool foundHeader = false;
                LastUsedAt = DateTime.Now;
                if (_socket == null || !_socket.Connected) return null;
                lock (_instanceLock)
                {
                    //     ,      ,     
                    var buffer = new byte[16*1024];
                    int remaining = -1;
                    using (var ms = new MemoryStream())
                    {
                        try
                        {
                            while (remaining != 0)
                            {
                                int allLen = _socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
                                if (!foundHeader)
                                {
                                    if (allLen >= headerLen)
                                    {
                                        foundHeader = true;
                                        int bodyLen = (buffer[4] << 24) + (buffer[5] << 16) + (buffer[6] << 8) +
                                                      buffer[7];
                                        remaining = (int) (headerLen + bodyLen - ms.Length);
                                    }
                                }
                                ms.Write(buffer, 0, allLen);
                                if (foundHeader)
                                {
                                    remaining -= allLen;
                                }
                            }
    
                            ret = new byte[ms.Length];
                            ms.Position = 0;
                            ms.Read(ret, 0, ret.Length);
    
                            LastUsedAt = DateTime.Now;
                            successful = true;
                            return ret;
                        }
                        catch (Exception ex)
                        {
                            successful = false;
                            _state = SessionState.Closed;
                            _logger.Error(string.Format("[Recv Failed] {0}", ex.Message), ex);
                        }
                    }
                }
    
                return ret;
            }
    
            public bool Open()
            {
                try
                {
                    Connect();
                    return true;
                }
                catch (Exception ex)
                {
                    _state = SessionState.Closed;
                    _logger.Error(string.Format("[Open Failed] {0}", ex.Message), ex);
                    return false;
                }
            }
    
            public bool SendRequest(byte[] package)
            {
               return Send(package);
            }
    
            public byte[] GetBody(byte[] data)
            {
                if (data == null || data.Length < 1)
                {
                    return null;
                }
    
                if (data.Length < 8)
                {
                    _logger.Error("           ");
                    return null;
                }
    
                int bodyLen = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) +
                              data[7];
                var body = new byte[bodyLen];
                if (bodyLen + 8 != data.Length)
                {
                    _logger.ErrorFormat("    :totalLen:({0}),bodyLen:{1}", data.Length, body.Length);
                    return null;
                }
                Buffer.BlockCopy(data, 8, body, 0, bodyLen);
                return body;
            }
        }  
    パケット
    スライスしてデータを する を し、 に を けるために、パケット を う があります. のパケット コードは の を します.
  • は、 したバイトストリームが なパケットを むかどうかを することができ、もし、イベントが げ され、そうでなければ、データを し けることができる.
  • は、パケットヘッダが であるかどうかを することができる.
  • は、パッケージが であるかどうかを することができる.
  • ネットワークの が い ( で 、 ローカルエリアネットワーク、 、 )はこの を し、クライアントとエージェント で し、この を する.

  • コードは のとおりです. /// /// /// public class PackageAnalyzer { public PackageAnalyzer(int headerLen) { _headerLen = headerLen; _header = new byte[_headerLen]; } /// /// /// private readonly int _headerLen; /// /// /// private readonly byte[] _header; /// /// /// private int _requiredDataLength; /// /// /// private int _receivedHeaderLength; /// /// /// private Header _headerFlag; /// /// /// private enum Header { NotFound, Found, PartialFound } /// /// /// private bool _headerWrited; /// /// /// public BufferWriter Writer { get; set; } /// /// /// public bool EnabledVariant { get; set; } /// /// /// /// public delegate void OnAnalyzeSuccess(BinaryResponseInfo requestInfo); /// /// /// /// /// /// /// public void TryProcess(byte[] data, int offset, int total, OnAnalyzeSuccess onAnalyzeSuccessCallback) { while (total > 0) { // if (_headerFlag == Header.NotFound) { // if (total >= _headerLen) { // _headerFlag = Header.Found; Array.Copy(data, offset, _header, 0, _headerLen); offset += _headerLen; total -= _headerLen; // _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7]; _receivedHeaderLength = 0; // } // else { Array.Copy(data, offset, _header, 0, total); _receivedHeaderLength += total; _headerFlag = Header.PartialFound; break; } } // if (_headerFlag == Header.Found) { // if (total >= _requiredDataLength) { // // if (!_headerWrited) { Writer = new BufferWriter(System.Text.Encoding.UTF8); _headerWrited = true; Writer.Write(_header, 0, _headerLen); } Writer.Write(data, offset, _requiredDataLength); offset += _requiredDataLength; total -= _requiredDataLength; // _requiredDataLength = 0; _receivedHeaderLength = 0; _headerFlag = Header.NotFound; _headerWrited = false; // , ////////////////////////////////////////////////////// var reader = new BufferReader(System.Text.Encoding.UTF8, Writer) { EnabledVariant = EnabledVariant }; BinaryResponseInfo responseInfo; MessageRead(reader, out responseInfo); if (responseInfo != null) if (onAnalyzeSuccessCallback != null) onAnalyzeSuccessCallback(responseInfo); ////////////////////////////////////////////////////// } // else { // // if (!_headerWrited) { Writer = new BufferWriter(System.Text.Encoding.UTF8); _headerWrited = true; Writer.Write(_header, 0, _headerLen); } // Writer.Write(data, offset, total); _requiredDataLength -= total; break; } } // if (_headerFlag == Header.PartialFound) { // if (total + _receivedHeaderLength < _headerLen) { Array.Copy(data, offset, _header, _receivedHeaderLength, total); _receivedHeaderLength += total; break; } // else { _headerFlag = Header.Found; var delta = _headerLen - _receivedHeaderLength; Array.Copy(data, offset, _header, _receivedHeaderLength, delta); total -= delta; offset += delta; // _requiredDataLength = (_header[4] << 24) + (_header[5] << 16) + (_header[6] << 8) + _header[7]; _receivedHeaderLength = 0; // } } } } /// /// /// /// /// /// /// public virtual int GetDataLengthFromHeader(byte[] buffer, int offset, int length) { // 5、6、7、8 return (buffer[offset + 4] << 24) + (buffer[offset + 5] << 16) + (buffer[offset + 6] << 8) + (buffer[offset + 7]); } /// /// /// /// /// private void MessageRead(BufferReader reader, out BinaryResponseInfo requestInfo) { var package = reader.ToBytes(); // ( ) var serviceKey = System.Text.Encoding.UTF8.GetString(package, 0, 4); var bodyLen = (package[4] << 24) + (package[5] << 16) + (package[6] << 8) + (package[7]); // System.Diagnostics.Debug.Assert(bodyLen > 0, " "); var body = new byte[bodyLen]; Buffer.BlockCopy(package, _headerLen, body, 0, bodyLen); requestInfo = new BinaryResponseInfo() { Key = serviceKey, Body = body }; } }

    , , , , Parser 。