C++ネットワークプログラミング学習:サービス側マルチスレッド分離業務処理高負荷


ネットワークプログラミング学習レコード
  • で使用される言語はC/C++
  • です.
  • ソースコードでサポートされているプラットフォームは、Windows/Linux
  • です.
    笔记一:基础TCP服务端/クライアント 点我跳转笔记二:ネットワークデータメッセージの送受信 点我跳转笔记三:selectネットワークモデル 点我跳转笔记四:プラットフォームをまたいでWindowsをサポート、Linuxシステム 点我跳转笔记5:ソースコードのパッケージ 点我跳转笔记6:バッファオーバーフローと粘着包分包 点我跳转笔记7:サービス端マルチスレッド分离业务処理高负荷 点我跳转笔记
    ノート7
  • ネットワークプログラミング学習記録
  • 一、構想と準備
  • 二、コードの改良
  • 1.新規サブスレッドクラス
  • 2.クライアント・プライマリ・スレッド・クラスの変更
  • 3.インタフェースを導入し、サブスレッドのメインスレッドへの通信
  • を実現する.
  • 三、詳細コード実現
  • 1.タイマーヘッダファイルhpp`
  • 2.コマンドヘッダファイル`CMD.h`
  • 3.サービスヘッダファイル`TcpServer.hpp`
  • 4.サービス側サンプルコード`server.cpp`


  • 一、考え方と準備
    これまでのサービス側の考え方は、次のようになりました.
    1.  socket
    2.    IP
    3.    
    while(true)
    {
         
    	4.  select            socket
    	5.                
    	6.(     )
    }
    7.  socket
    

    ただし、このようなアーキテクチャはselect処理イベントが多い場合、効率が低下しやすい.このような問題に対して,生産者と消費者モデルを導入し,このような同時問題を処理することができる.  メインスレッドは、新しいクライアント加入イベントを処理し、新しいクライアントを消費者スレッドに割り当てるための生産者スレッドである.消費者スレッドは、クライアントから送信されたメッセージを処理するために作成された新しいスレッドです.これにより、イベント処理の分離が実現され、サービス側の処理効率が向上する.複数のクライアントが同時に殺到すると、接続をより迅速に確立することができる(このイベントを処理するために専用のスレッドがあるため).クライアントがメッセージを送信する頻度が速い場合、マルチスレッド処理メッセージも効率を大幅に向上させる.
  • の大まかな改善構想は以下の通りであり、赤色は今回加入する必要があるコアであり、黒は元のアーキテクチャ
  • である.
    したがって、まず、消費者スレッドに関するコンテンツをカプセル化し、マルチスレッドアーキテクチャを構築するためのスレッドクラスを新規作成する必要があります.その後、今回の改良では、現在のクライアント接続数、パケットの毎秒受信数を統計し、データを表示するためにタイマーを追加することにしました.同時に、私も新聞のタイプを分離して、新聞のタイプを個別のヘッダファイルに置いて、このように変更しやすくて引用しやすいです.
  • 1.タイマーについてはこちらをクリックしてください.マルチスレッド関連はここ
  • をクリックしてください
    二、コードの改善
    1.新規サブスレッドクラス
  • は、まず新規スレッドクラスCellServerであり、その中に含まれる基礎的な方法および関連変数は以下の通りである:
  • //   
    class CellServer
    {
         
    public:
    	//   
    	CellServer(SOCKET sock = INVALID_SOCKET);
    	//  
    	~CellServer();
    	//  socket 
    	void CloseSocket();
    	//        
    	bool IsRun();
    	//           
    	bool OnRun();
    	//    
    	int RecvData(ClientSocket *t_client);
    	//    
    	void NetMsg(DataHeader *head,SOCKET temp_socket);
    	//      
    	void addClient(ClientSocket* client);
    	//    
    	void Start(); 
    	//           
    	int GetClientCount()
    	
    private:
    	//      
    	char *_Recv_buf;//      
    	//socket   
    	SOCKET _sock; 
    	//       
    	std::vector<ClientSocket*> _clients;//     
    	//       
    	std::vector<ClientSocket*> _clientsBuf; 
    	std::mutex _mutex;// 
    	//   
    	std::thread* _pThread;
    
    public:
    	std::atomic_int _recvCount;//       
    };
    
    
  • の大まかな処理構想は以下の通りである:
  • Start()GetClientCount()                        
    //                 ,    
    addClient(ClientSocket* client)                ,               
    
       :
    OnRun()//    
    {
         
    	while(IsRun())//       
    	{
         
    		1.                 
    		2.continue    
    		3.select        ,           CloseSocket()
    		4.          RecvData(),        ,    NetMsg()
    	}
    }
    

    2.クライアント・スレッド・クラスの変更
      イベントの処理をサブスレッドに変更したため、まずプライマリスレッドではメッセージメッセージを処理する必要がなくなり、クラスでメッセージを受信したり、メッセージを処理したりする方法は削除できます.同時に、Startメソッドを追加してサブスレッドを起動し、time4msgメソッドを追加して、サブスレッド内のクライアント数、1秒当たりのパケット数などのデータを表示します.
  • メインスレッドクラスTcpServer、変更後:
  • class TcpServer : INetEvent
    {
         
    public:
    	//   
    	TcpServer();
    	//   
    	~TcpServer();
    	//   socket   1    
    	int InitSocket();
    	//  IP/  
    	int Bind(const char* ip,unsigned short port);
    	//    
    	int Listen(int n);
    	//    
    	int Accept();
    	//           
    	void AddClientToServer(ClientSocket* pClient);
    	//     
    	void Start();
    	//  socket 
    	void CloseSocket();
    	//        
    	bool IsRun();
    	//           
    	bool OnRun();
    	//          
    	void time4msg();
    	
    private:
    	//socket   
    	SOCKET _sock; 
    	std::vector<ClientSocket*> _clients;//     
    	std::vector<CellServer*> _cellServers;//     
    	//   
    	mytimer _time; 
    };
    
    
  • 大まかな処理構想は以下の通りである:タイマー関連はここ
  • をクリックしてください.
      TcpServer           :
    1.InitSocket()     socket
    2.Bind(const char* ip,unsigned short port)      IP
    3.Listen(int n)   
    4.Start()     
    while(5.IsRun())       
    {
         
    	6.OnRun()   select      
    }
    7.CloseSocket()   socket
    
        :
    OnRun()
    {
         
    	time4msg()     
    	select           
    	         ,  Accept()    
    	Accept()AddClientToServer(ClientSocket* pClient)          
    }
    
    AddClientToServer()GetClientCount()                
            addClient(ClientSocket* client)time4msg()GetSecond()        
          ,         :    _recvCount    ,    _clients.size()       
       UpDate()     ,       ,              
    

    3.インタフェースを導入し、サブスレッドからメインスレッドへの通信を実現する
    前の2つのステップの実現を通じて、マルチスレッドサービス側はすでに初歩的に完成し、次にいくつかの改善が必要である.  サブスレッドオブジェクトは、プライマリスレッドStart()メソッドで作成され、その後、コンテナ_cellServersに追加されて格納されることが容易にわかります.これにより、プライマリ・スレッドでは、サブスレッド・クラスのメソッドとメンバー変数を呼び出すことができますが、サブスレッドではプライマリ・スレッドのメソッドとメンバー変数を呼び出すことができません.これにより、サブスレッドにクライアントが終了した場合、プライマリスレッドは理解できません.  この場合、プライマリスレッドクラスにこのインタフェースを継承させるインタフェースを作成できます.サブスレッドは、このインタフェースを介してプライマリスレッド内の特定のメソッドを呼び出すことができます.
  • インタフェースクラスINetEventは以下の通りである:
  • class INetEvent
    {
         
    public:
    	//       
    	virtual void OnLeave(ClientSocket* pClient) = 0;
    private:	
    };
    
  • メインスレッドクラスとサブスレッドクラスの関連実装:
  • 1.class TcpServer : INetEvent
    
    2.//     
    void OnLeave(ClientSocket* pClient)
    {
         
    	//         
    	for(int n=0; n<_clients.size(); n++)
    	{
         
    		if(_clients[n] == pClient)
    		{
         
    			auto iter = _clients.begin() + n;
    			if(iter != _clients.end())
    			{
         
    				_clients.erase(iter);//   
    			}
    		}
    	} 
    }3.private:
    	INetEvent* _pNetEvent; 
          
    
    4.void setEventObj(INetEvent* event)
    {
         
    	_pNetEvent = event; 
    }
    event        ,          
    
    5.     、       ,     ,    this
         ->setEventObj(this);
    
    6.               OnLeave()       
    _pNetEvent->OnLeave(       );
    

    三、詳細コード実現
    1.タイマヘッダファイルmytimer.hpp
    #ifndef MY_TIMER_H_
    #define MY_TIMER_H_
    
    #include
    
    class mytimer
    {
         
    private:
        std::chrono::steady_clock::time_point _begin;//    
        std::chrono::steady_clock::time_point _end;//    
    public:
        mytimer()
    	{
         
    		_begin = std::chrono::steady_clock::time_point();
    		_end = std::chrono::steady_clock::time_point();
    	}
        
    	virtual ~mytimer(){
         };  
        
        //  update ,           
        void UpDate()
        {
         
            _begin = std::chrono::steady_clock::now();
        }
    
    	//  getsecond   ,                      。
        double GetSecond()
        {
         
            _end = std::chrono::steady_clock::now();
            //  duration              duration_cast       
            std::chrono::duration<double> temp = std::chrono::duration_cast<std::chrono::duration<double>>(_end - _begin);
           	return temp.count();//count()            
        }
        
    };
    
    #endif
    

    2.コマンドヘッダファイルCMD.h
    //         
    enum cmd 
    {
         
    	CMD_LOGIN,//   
    	CMD_LOGINRESULT,//     
    	CMD_LOGOUT,//   
    	CMD_LOGOUTRESULT,//     
    	CMD_NEW_USER_JOIN,//      
    	CMD_ERROR//   
    };
    //       
    struct DataHeader 
    {
         
    	short cmd;//  
    	short date_length;//     	
    };
    // 1           
    struct Login : public DataHeader 
    {
         
    	Login()//      
    	{
         
    		this->cmd = CMD_LOGIN;
    		this->date_length = sizeof(Login); 
    	}
    	char UserName[32];//    
    	char PassWord[32];//   
    };
    // 2          
    struct LoginResult : public DataHeader 
    {
         
    	LoginResult()//      
    	{
         
    		this->cmd = CMD_LOGINRESULT;
    		this->date_length = sizeof(LoginResult); 
    	}
    	int Result;
    };
    // 3          
    struct Logout : public DataHeader 
    {
         
    	Logout()//      
    	{
         
    		this->cmd = CMD_LOGOUT;
    		this->date_length = sizeof(Logout); 
    	}
    	char UserName[32];//    
    };
    // 4          
    struct LogoutResult : public DataHeader 
    {
         
    	LogoutResult()//      
    	{
         
    		this->cmd = CMD_LOGOUTRESULT;
    		this->date_length = sizeof(LogoutResult); 
    	}
    	int Result;
    };
    // 5            
    struct NewUserJoin : public DataHeader 
    {
         
    	NewUserJoin()//      
    	{
         
    		this->cmd = CMD_NEW_USER_JOIN;
    		this->date_length = sizeof(NewUserJoin); 
    	}
    	char UserName[32];//    
    };
    

    3.サービス側ヘッダファイルTcpServer.hpp
    #ifndef _TcpServer_hpp_
    #define _TcpServer_hpp_
    
    #ifdef _WIN32
    	#define FD_SETSIZE 10240 
    	#define WIN32_LEAN_AND_MEAN
    	#include
    	#include
    	#pragma comment(lib,"ws2_32.lib")//         windows   
    #else
    	#include//selcet
    	#include//uni std
    	#include
    	
    	#define SOCKET int
    	#define INVALID_SOCKET (SOCKET)(~0)
    	#define SOCKET_ERROR (-1)
    #endif
    
    #include"CMD.h"//   
    #include"mytimer.hpp"//    
    #include
    #include
    #include 
    #include
    #include
    
    //      
    #ifndef RECV_BUFFER_SIZE
    	#define RECV_BUFFER_SIZE 4096
    #endif 
    
    //     
    #define _THREAD_COUNT 4
    
    //     
    class ClientSocket
    {
         
    public:
    	//   
    	ClientSocket(SOCKET sockfd = INVALID_SOCKET)
    	{
         
    		_sockfd = sockfd;
    		//      
    		_Msg_buf = new char[RECV_BUFFER_SIZE*10];
    		_Len_buf = 0; 
    	}
    	//   
    	virtual ~ClientSocket()
    	{
         
    		delete[] _Msg_buf;
    	}
    	
    	//  socket 
    	SOCKET GetSockfd()
    	{
         
    		return _sockfd;
    	}
    	
    	//      
    	char* MsgBuf()
    	{
         
    		return _Msg_buf;
    	} 
    	
    	//          
    	int GetLen()
    	{
         
    		return _Len_buf;	
    	} 
    	
    	//         
    	void SetLen(int len)
    	{
         
    		_Len_buf = len;
    	} 
    		
    private:	
    	SOCKET _sockfd;
    	//      
    	char *_Msg_buf;//      
    	int _Len_buf;//         
    }; 
    
    //    
    class INetEvent
    {
         
    public:
    	//       
    	virtual void OnLeave(ClientSocket* pClient) = 0;
    private:	
    };
    
    //   
    class CellServer
    {
         
    public:
    	//   
    	CellServer(SOCKET sock = INVALID_SOCKET)
    	{
         
    		_sock = sock; 
    		_pThread = nullptr;
    		_pNetEvent = nullptr;
    		_recvCount = 0; 
    		//      
    		_Recv_buf = new char[RECV_BUFFER_SIZE];
    	}
    	//  
    	~CellServer()
    	{
         
    		delete[] _Recv_buf;
    		//  socket 
    		CloseSocket();
    		_sock = INVALID_SOCKET;
    	} 
    	
    	//     
    	void setEventObj(INetEvent* event)
    	{
         
    		_pNetEvent = event; 
    	}
    	
    	//  socket 
    	void CloseSocket()
    	{
         
    		if(INVALID_SOCKET != _sock) 
    		{
         
    #ifdef _WIN32
    			//     socket
    			for(int n=0; n<_clients.size(); ++n)
    			{
         
    				closesocket(_clients[n]->GetSockfd());
    				delete _clients[n];
    			}
    			//  socket
    			closesocket(_sock); 
    			//  windows socket    
    			WSACleanup(); 
    #else
    			//     socket
    			for(int n=0; n<_clients.size(); ++n)
    			{
         
    				close(_clients[n]->GetSockfd());
    				delete _clients[n];
    			}
    			//  socket/LINUX
    			close(_sock);
    #endif
    			_sock = INVALID_SOCKET;
    			_clients.clear();
    		}
    	} 
    	
    	
    	//        
    	bool IsRun()
    	{
         
    		return _sock != INVALID_SOCKET; 
    	}
    	
    	//           
    	bool OnRun()
    	{
         
    		while(IsRun())
    		{
         
    			//                  
    			if(_clientsBuf.size() > 0)
    			{
         
    				std::lock_guard<std::mutex> lock(_mutex);//   
    				for(auto client :_clientsBuf)
    				{
         
    					_clients.push_back(client);
    				}
    				_clientsBuf.clear();
    			} 
    			//                
    			if(_clients.empty())
    			{
         
    				std::chrono::milliseconds t(1);//      
    				std::this_thread::sleep_for(t);
    				continue;
    			}
    			fd_set fdRead;//     
    			FD_ZERO(&fdRead);//     
    			SOCKET maxSock = _clients[0]->GetSockfd();//  socket 
    			//          read   
    			for(int n=_clients.size()-1; n>=0; --n)
    			{
         
    				FD_SET(_clients[n]->GetSockfd(),&fdRead);
    				if(maxSock < _clients[n]->GetSockfd())
    				{
         
    					maxSock = _clients[n]->GetSockfd();
    				}
    			}
    			//select    select 
    			int ret = select(maxSock+1,&fdRead,0,0,0); 
    			if(ret<0)
    			{
         
    				printf("select    
    "
    ); CloseSocket(); return false; } // socket for(int n=0; n<_clients.size(); ++n) { if(FD_ISSET(_clients[n]->GetSockfd(),&fdRead)) { if(-1 == RecvData(_clients[n]))// { std::vector<ClientSocket*>::iterator iter = _clients.begin()+n;// if(iter != _clients.end())// { if(_pNetEvent)// { _pNetEvent->OnLeave(_clients[n]); } delete _clients[n]; _clients.erase(iter);// } } } } //printf("
    ");
    } } // int RecvData(ClientSocket *t_client)// { _recvCount++;// // int buf_len = recv(t_client->GetSockfd(), _Recv_buf, RECV_BUFFER_SIZE, 0); if(buf_len<=0) { printf("
    "
    ); return -1; } // memcpy(t_client->MsgBuf() + t_client->GetLen(), _Recv_buf, buf_len); // t_client->SetLen(t_client->GetLen() + buf_len); // while(t_client->GetLen() >= sizeof(DataHeader))// { // DataHeader* header = (DataHeader*)t_client->MsgBuf(); // if(t_client->GetLen() >= header->date_length) { // int size = t_client->GetLen() - header->date_length; // NetMsg(header,t_client->GetSockfd()); // memcpy(t_client->MsgBuf(), t_client->MsgBuf() + header->date_length, size); // t_client->SetLen(size); } else { // break; } } return 0; } // void NetMsg(DataHeader *head,SOCKET temp_socket) { //printf(" , :%d, :%d
    ",head->cmd,head->date_length);
    switch(head->cmd) { case CMD_LOGIN:// { Login *login = (Login*)head; /* */ //printf("%s
    :%s
    ",login->UserName,login->PassWord);
    LoginResult *result = new LoginResult; result->Result = 1; //SendData(result,temp_socket); } break; case CMD_LOGOUT:// { Logout *logout = (Logout*)head; /* */ //printf("%s
    ",logout->UserName);
    LogoutResult *result = new LogoutResult(); result->Result = 1; //SendData(result,temp_socket); } break; default:// { head->cmd = CMD_ERROR; head->date_length = 0; //SendData(head,temp_socket); } break; } } // void addClient(ClientSocket* client) { std::lock_guard<std::mutex> lock(_mutex); //_mutex.lock(); _clientsBuf.push_back(client); //_mutex.unlock(); } // void Start() { _pThread = new std::thread(std::mem_fun(&CellServer::OnRun),this); } // int GetClientCount() { return _clients.size() + _clientsBuf.size(); } private: // char *_Recv_buf;// //socket SOCKET _sock; // std::vector<ClientSocket*> _clients;// // std::vector<ClientSocket*> _clientsBuf; std::mutex _mutex;// // std::thread* _pThread; // INetEvent* _pNetEvent; public: std::atomic_int _recvCount;// }; // class TcpServer : INetEvent { public: // TcpServer() { _sock = INVALID_SOCKET; } // virtual ~TcpServer() { // socket CloseSocket(); } // socket 1 int InitSocket() { #ifdef _WIN32 // windows socket 2,x WORD ver = MAKEWORD(2,2); WSADATA dat; if(0 != WSAStartup(ver,&dat)) { return -1;//-1 } #endif // socket if(INVALID_SOCKET != _sock) { printf("
    "
    ,_sock); CloseSocket();// } _sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(INVALID_SOCKET == _sock) { return 0;//0 socket } return 1; } // IP/ int Bind(const char* ip,unsigned short port) { // if(INVALID_SOCKET == _sock) { InitSocket(); } // IP sockaddr_in _myaddr = { }; _myaddr.sin_family = AF_INET;//IPV4 _myaddr.sin_port = htons(port);// #ifdef _WIN32 if(ip)//ip { _myaddr.sin_addr.S_un.S_addr = inet_addr(ip);//IP } else { _myaddr.sin_addr.S_un.S_addr = INADDR_ANY;//IP } #else if(ip)//ip { _myaddr.sin_addr.s_addr = inet_addr(ip);//IP } else { _myaddr.sin_addr.s_addr = INADDR_ANY;//IP } #endif if(SOCKET_ERROR == bind(_sock,(sockaddr*)&_myaddr,sizeof(sockaddr_in)))//socket ( )sockaddr { printf("
    "
    ); return 0; } else { printf("
    %d
    "
    ,port); return 1; } } // int Listen(int n) { // if(INVALID_SOCKET == _sock) { printf(" IP
    "
    ); return 0; } // if(SOCKET_ERROR == listen(_sock,n))// { printf("
    "
    ); return 0; } else { printf("
    "
    ); return 1; } } // int Accept() { // sockaddr_in clientAddr = { };// sockadd int addr_len = sizeof(sockaddr_in);// sockadd SOCKET temp_socket = INVALID_SOCKET;// #ifdef _WIN32 temp_socket = accept(_sock,(sockaddr*)&clientAddr,&addr_len);// #else temp_socket = accept(_sock,(sockaddr*)&clientAddr,(socklen_t*)&addr_len);// #endif if(INVALID_SOCKET == temp_socket)// { printf(" , SOCKET
    "
    ,temp_socket); return 0; } else { //printf(" count: %d
    IP :%s
    ", _clients.size(), inet_ntoa(clientAddr.sin_addr));
    // //NewUserJoin *user_join = new NewUserJoin(); //strcpy(user_join->UserName,inet_ntoa(clientAddr.sin_addr)); //SendDataToAll(user_join); // AddClientToServer(new ClientSocket(temp_socket)); return 1; } } // void AddClientToServer(ClientSocket* pClient) { _clients.push_back(pClient); // auto pMinServer = _cellServers[0]; for(auto pCellServer : _cellServers) { if(pMinServer->GetClientCount() > pCellServer->GetClientCount()) { pMinServer = pCellServer; } } pMinServer->addClient(pClient); } // void Start() { for(int n=0; n<_THREAD_COUNT; n++) { // auto ser = new CellServer(_sock); _cellServers.push_back(ser); ser->setEventObj(this); ser->Start(); } } // socket void CloseSocket() { if(INVALID_SOCKET != _sock) { #ifdef _WIN32 // socket for(int n=0; n<_clients.size(); ++n) { closesocket(_clients[n]->GetSockfd()); delete _clients[n]; } // socket closesocket(_sock); // windows socket WSACleanup(); #else // socket for(int n=0; n<_clients.size(); ++n) { close(_clients[n]->GetSockfd()); delete _clients[n]; } // socket/LINUX close(_sock); #endif _sock = INVALID_SOCKET; _clients.clear(); } } // bool IsRun() { return _sock != INVALID_SOCKET; } // bool OnRun() { if(IsRun()) { time4msg();// fd_set fdRead;// //fd_set fdWrite; //fd_set fdExcept; FD_ZERO(&fdRead);// //FD_ZERO(&fdWrite); //FD_ZERO(&fdExcept); FD_SET(_sock,&fdRead);// //FD_SET(_sock,&fdWrite); //FD_SET(_sock,&fdExcept); timeval s_t = { 0,0};//select //select select int ret = select(_sock+1,&fdRead,0,0,&s_t); if(ret<0) { printf("select
    "
    ); CloseSocket(); return false; } if(FD_ISSET(_sock,&fdRead))// socket { FD_CLR(_sock,&fdRead);// Accept();// } return true; } return false; } // void time4msg() { auto t1 = _time.GetSecond(); if(1.0 <= t1) { int recvCount = 0; for(auto ser: _cellServers) { recvCount += ser->_recvCount; ser->_recvCount = 0; } // socket printf("time,socket,clients,recvCount
    "
    , t1, _sock, _clients.size(),(int)(recvCount/t1)); _time.UpDate(); } } // int SendData(DataHeader *head,SOCKET temp_socket) { if(IsRun() && head) { send(temp_socket,(const char*)head,head->date_length,0); return 1; } return 0; } // void SendDataToAll(DataHeader *head) { for(int n=0;n<_clients.size();++n) { SendData(head, _clients[n]->GetSockfd()); } } // void OnLeave(ClientSocket* pClient) { // for(int n=0; n<_clients.size(); n++) { if(_clients[n] == pClient) { auto iter = _clients.begin() + n; if(iter != _clients.end()) { _clients.erase(iter);// } } } } private: //socket SOCKET _sock; std::vector<ClientSocket*> _clients;// std::vector<CellServer*> _cellServers;// // mytimer _time; }; #endif

    4.サービス側サンプルコードserver.cpp
    #include"TcpServer.hpp"
     
    int main() 
    {
         
    	printf("Welcome
    "
    ); // tcp TcpServer *tcp1 = new TcpServer(); // socket tcp1->InitSocket(); // IP tcp1->Bind(NULL,8888); // tcp1->Listen(5); // tcp1->Start(); // while(tcp1->IsRun()) { tcp1->OnRun(); } // tcp1->CloseSocket(); printf(" , "); getchar(); return 0; }