evconnlistener tcp高同時サービス

7164 ワード

libevent自体はtcpサービスをよく実現していますが、libeventはwindowsの下でデフォルトでselectモデルです.selectはIOCPモデルに比べて同時量と性能が非常に悪い.IOCPモデルを使用する必要があります

1.IOCPモデルのtcpサービスを有効にする


1.1 Windowsスレッドセキュリティモデルの有効化
evthread_use_windows_threads

1.2 IOCPモデルの有効化
event_config *cfg = event_config_new();
event_config_set_flag(cfg, EVENT_BASE_FLAG_STARTUP_IOCP);

// cpu ,cpu *2+2
event_config_set_num_cpus_hint(cfg,8)

1.3 tcpサービスの有効化
evconnlistener_new_bind

1.4接続されたソケットデータの傍受
m_pbuf_eve = bufferevent_socket_new(base, fd, options);

// , 
if (m_pbuf_eve)
{
	bufferevent_setcb(m_pbuf_eve, sock_read_data_cb, sock_write_data_cb, sock_event_cb, nullptr);
	bufferevent_enable(m_pbuf_eve, EV_READ | EV_WRITE);
}

完全なソース:
// sock 
void accept_cb(struct evconnlistener * listener, evutil_socket_t fd, struct sockaddr *addr, int socklen, void *arg)
{
	SockMgr::instance().new_sockitem(evconnlistener_get_base(listener), fd, addr, socklen);
}

// 
void accept_errorcb(struct evconnlistener *listener, void *arg)
{
	cout << "accept error" << endl;
}

int _tmain(int argc, _TCHAR* argv[])
{
	CSocketStartup start_up;
	evthread_use_windows_threads();

	// Select , IOCP
	event_config *cfg = event_config_new();
	event_config_set_flag(cfg, EVENT_BASE_FLAG_STARTUP_IOCP);
	event_base *pbase = event_base_new_with_config(cfg);
	if (pbase == NULL)
	{
		cout << "init error,event base is null" << endl;
		return -1;
	}

	sockaddr_in addr = { 0 };
	addr.sin_family = AF_INET;
	addr.sin_port = htons(SERVER_PORT);
	inet_pton(AF_INET, SERVER_IP, &addr.sin_addr);
	evconnlistener *listener = evconnlistener_new_bind(pbase, accept_cb, NULL, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, -1, (struct sockaddr*)&addr, sizeof(addr));
	if (listener == nullptr)
	{
		cout << "listener error ,listener is null" << endl;
		return -1;
	}
	evconnlistener_set_error_cb(listener, accept_errorcb);
	cout << "server listen:" << SERVER_IP << ":" << SERVER_PORT << endl;
	
	event_base_dispatch(pbase);
	return 0;
}


2.すべてのTCP接続を管理する

  • は、すべてのリンク、および通知メッセージ
  • の処理を容易にする.
  • オブジェクトをリンクすると、管理オブジェクトがより便利になります.
  • 受信データの非同期処理については、処理が完了する後にデータを送信し、受信データの場所で時間のかかる操作をしないでください
  • .
    sock_mgr.h
    #ifndef __CLIENT_SOCK_H__
    #define __CLIENT_SOCK_H__
    #include "stdafx.h"
    #include 
    #include 
    
    class CCritical
    {
    public:
    	CCritical(CRITICAL_SECTION &cs) :m_cs(cs)
    	{
    		::EnterCriticalSection(&m_cs);
    	}
    	~CCritical()
    	{
    		::LeaveCriticalSection(&m_cs);
    	}
    private:
    	CRITICAL_SECTION &m_cs;
    };
    class SockItem
    {
    public:
    	SockItem(struct event_base *base, evutil_socket_t fd, int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE, struct sockaddr *addr = NULL, int socklen = 0);
    	~SockItem();
    
    	operator bufferevent *(){return m_pbuf_eve;}
    	std::string ip(){ return m_strip; }
    	unsigned short port(){ return m_port; }
    
    	int send(const char *pdata, const int nlen);
    private:
    	bufferevent *m_pbuf_eve;
    	std::string m_strip;
    	unsigned short m_port;
    };
    
    class SockMgr
    {
    public:
    	static SockMgr& instance();
    	std::shared_ptr new_sockitem(event_base *pbase, evutil_socket_t fd, struct sockaddr *addr, int socklen);
    	void free_sockitem(bufferevent *m_pbuf_eve);
    	void broadcast(const char *pdata, const int nlen);
    private:
    	SockMgr();
    private:
    	std::map > m_map_sockitem;// 
    	CRITICAL_SECTION m_cs;
    	static SockMgr* s_pthis;
    };
    
    
    #endif
    
    
    

    sock_mgr.cpp
    #include "stdafx.h"
    #include "event2/buffer.h"
    #include "sock_mgr.h"
    #include 
    
    static void sock_read_data_cb(struct bufferevent *bev, void *ctx)
    {
    	int nread_size = 0;
    	char szData[4096] = "";
    	std::string strData;
    	do
    	{
    		nread_size = bufferevent_read(bev, szData, 4096);
    		strData = szData;
    	} while (nread_size == 4096);
    
    	cout << "sock:" << bufferevent_getfd(bev) << ",read data:" << strData< 0)
    	{
    		struct sockaddr_in *pin = (struct sockaddr_in *)addr;
    		m_strip = inet_ntoa(pin->sin_addr);
    		m_port = pin->sin_port;
    	}
    }
    
    SockItem::~SockItem()
    {
    	if (m_pbuf_eve)
    		bufferevent_free(m_pbuf_eve);
    	m_pbuf_eve = nullptr;
    }
    
    int SockItem::send(const char *pdata, const int nlen)
    {
    	if (m_pbuf_eve)
    		return bufferevent_write(m_pbuf_eve,pdata,nlen);
    	return -1;
    }
    
    /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    SockMgr * SockMgr::s_pthis = NULL;
    
    SockMgr::SockMgr()
    {
    	::InitializeCriticalSection(&m_cs);
    }
    
    SockMgr& SockMgr::instance()
    {
    	if (s_pthis == NULL)
    		s_pthis = new SockMgr();
    	return *s_pthis;
    }
    
    std::shared_ptr SockMgr::new_sockitem(event_base *pbase, evutil_socket_t fd, struct sockaddr *addr, int socklen)
    {
    	std::shared_ptr pItem(new SockItem(pbase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE, addr, socklen));
    	bufferevent *pbuf_eve = *pItem;
    	if (pbuf_eve == NULL)
    	{
    		cout << "new connection bufferevent failed" << endl;
    		return NULL;
    	}
    
    	cout << "sock:" << bufferevent_getfd(*pItem) << ",ip:" << pItem->ip() << ":" << pItem->port() << " connected" << endl;
    
    	{
    		CCritical lk(m_cs);
    		m_map_sockitem[pbuf_eve] = pItem;
    	}
    	
    	return pItem;
    }
    
    void SockMgr::free_sockitem(bufferevent *pbuf_eve)
    {
    	std::shared_ptr pitem = nullptr;
    	
    	{
    		CCritical lk(m_cs);
    		if (m_map_sockitem.count(pbuf_eve))
    			pitem = m_map_sockitem[pbuf_eve];
    		m_map_sockitem.erase(pbuf_eve);
    	}
    
    	cout << "sock:" << bufferevent_getfd(*pitem) << ",ip:" << pitem->ip() << ":" << pitem->port() << " disconnected" << endl;
    }
    
    void SockMgr::broadcast(const char *pdata, const int nlen)
    {
    	std::map> temp_map_sockitem;
    	{
    		CCritical lk(m_cs);
    		temp_map_sockitem = m_map_sockitem;
    	}
    
    	for (std::map>::iterator it = temp_map_sockitem.begin();
    		it != temp_map_sockitem.end();++it)
    	{
    		if (it->second.get())
    		{
    			it->second->send(pdata, nlen);
    		}
    	}
    }
    

    後はパケット処理(粘着注意)のみを行い、非同期処理が完了したらmgrで応答結果を送信すればOKです.