libuvベースのTCP設計


私はずっとプラットフォームにまたがるネットワークライブラリを探しています.boostとACEは膨大で、考えません.libevent,libev,libuvを比較した後、最終的にlibuv.libuvドキュメントが少なく、例も簡単で、tcpにはecho-serverの例しかありません.ネット上でもパッケージの例を探しましたが、以下のようになっています.
libsourceyライブラリは、多くのライブラリをカプセル化しています.libuvのパッケージは他のコードと結合しており、剥離しにくい.http://sourcey.com/libuv-cpp-wrappers/C++11パッケージの、残念ながらVS 10はC++11を完全にサポートしていないhttps://github.com/larroy/uvpp C++パッケージ https://github.com/keepallsimple/uvpp
       私はraw tcp serverを実現したいと思っています.何万ものリンク数をサポートしています.ネット上で見つけたものは私に合わないので、各例を参照して自分でパッケージするしかありません.
/***************************************
* @file     tcpsocket.h
* @brief      libuv   tcp       ,  log4z     
* @details
* @author   phata, [email protected]
* @date     2014-5-13
* @mod      2014-5-13  phata              .            
                                       ,          
****************************************/
#ifndef TCPSocket_H
#define TCPSocket_H
#include "uv.h"
#include 
#include 
#include 
#define BUFFERSIZE (1024*1024)

namespace uv
{
typedef void (*newconnect)(int clientid);
typedef void (*server_recvcb)(int cliendid, const char* buf, int bufsize);
typedef void (*client_recvcb)(const char* buf, int bufsize, void* userdata);

class TCPServer;
class clientdata
{
public:
    clientdata(int clientid):client_id(clientid),recvcb_(nullptr) {
        client_handle = (uv_tcp_t*)malloc(sizeof(*client_handle));
        client_handle->data = this;
        readbuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
		writebuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
    }
    virtual ~clientdata() {
        free(readbuffer.base);
        readbuffer.base = nullptr;
        readbuffer.len = 0;

		free(writebuffer.base);
		writebuffer.base = nullptr;
		writebuffer.len = 0;

        free(client_handle);
        client_handle = nullptr;
    }
    int client_id;//   id,  
    uv_tcp_t* client_handle;//     
    TCPServer* tcp_server;//     (              )
    uv_buf_t readbuffer;//     buf
	uv_buf_t writebuffer;//    buf
    uv_write_t write_req;
    server_recvcb recvcb_;//            
};


class TCPServer
{
public:
    TCPServer(uv_loop_t* loop = uv_default_loop());
    virtual ~TCPServer();
    static void StartLog(const char* logpath = nullptr);//    ,          
public:
    //    
    bool Start(const char *ip, int port);//     ,   IP4
    bool Start6(const char *ip, int port);//     ,   IP6
    void close();

    bool setNoDelay(bool enable);
    bool setKeepAlive(int enable, unsigned int delay);

    const char* GetLastErrMsg() const {
        return errmsg_.c_str();
    };

    virtual int  send(int clientid, const char* data, std::size_t len);
    virtual void setnewconnectcb(newconnect cb);
    virtual void setrecvcb(int clientid,server_recvcb cb);//        ,         
protected:
    int GetAvailaClientID()const;//     client id
    bool DeleteClient(int clientid);//         
    //      
    static void AfterServerRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
    static void AfterSend(uv_write_t *req, int status);
    static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
    static void AfterServerClose(uv_handle_t *handle);
	static void AfterClientClose(uv_handle_t *handle);
    static void acceptConnection(uv_stream_t *server, int status);

private:
    bool init();
    bool run(int status = UV_RUN_DEFAULT);
    bool bind(const char* ip, int port);
    bool bind6(const char* ip, int port);
    bool listen(int backlog = 1024);


    uv_tcp_t server_;//     
    std::map clients_list_;//      
	uv_mutex_t mutex_handle_;//  clients_list_
    uv_loop_t *loop_;
    std::string errmsg_;
    newconnect newconcb_;
	bool isinit_;//      ,  close     
};



class TCPClient
{
    //    connect/connect6     
public:
    TCPClient(uv_loop_t* loop = uv_default_loop());
    virtual ~TCPClient();
    static void StartLog(const char* logpath = nullptr);//    ,          
public:
    //    
    virtual bool connect(const char* ip, int port);//  connect  ,      connect  
    virtual bool connect6(const char* ip, int port);//  connect  ,      connect  
    virtual int  send(const char* data, std::size_t len);
    virtual void setrecvcb(client_recvcb cb, void* userdata);////        ,    
    void close();

    //    Nagle  
    bool setNoDelay(bool enable);
    bool setKeepAlive(int enable, unsigned int delay);

    const char* GetLastErrMsg() const {
        return errmsg_.c_str();
    };
protected:
    //      
    static void AfterConnect(uv_connect_t* handle, int status);
    static void AfterClientRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
    static void AfterSend(uv_write_t *req, int status);
    static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
    static void AfterClose(uv_handle_t *handle);

    static void ConnectThread(void* arg);//   connect  
    static void ConnectThread6(void* arg);//   connect  

    bool init();
    bool run(int status = UV_RUN_DEFAULT);
private:
    enum {
        CONNECT_TIMEOUT,
        CONNECT_FINISH,
        CONNECT_ERROR,
        CONNECT_DIS,
    };
    uv_tcp_t client_;//     
    uv_loop_t *loop_;
    uv_write_t write_req_;//    
    uv_connect_t connect_req_;//     
    uv_thread_t connect_threadhanlde_;//    
    std::string errmsg_;//    
    uv_buf_t readbuffer_;//     buf
	uv_buf_t writebuffer_;//    buf
	uv_mutex_t write_mutex_handle_;//  write,    write       write

    int connectstatus_;//    
    client_recvcb recvcb_;//    
    void* userdata_;//         
    std::string connectip_;//      IP
    int connectport_;//         
	bool isinit_;//      ,  close     
};

}


#endif // TCPSocket_H
#include "tcpsocket.h"
#include "log4z.h"

std::string GetUVError(int retcode)
{
    std::string err;
    err = uv_err_name(retcode);
    err +=":";
    err += uv_strerror(retcode);
    return std::move(err);
}

namespace uv
{
/*****************************************TCP Server*************************************************************/
TCPServer::TCPServer(uv_loop_t* loop)
    :newconcb_(nullptr), isinit_(false)
{
    loop_ = loop;
}


TCPServer::~TCPServer()
{
    close();
    LOGI("tcp server exit.");
}

//      --         
bool TCPServer::init()
{
    if (isinit_) {
        return true;
    }

    if (!loop_) {
        errmsg_ = "loop is null on tcp init.";
        LOGE(errmsg_);
        return false;
    }
    int iret = uv_mutex_init(&mutex_handle_);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    iret = uv_tcp_init(loop_,&server_);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    isinit_ = true;
    server_.data = this;
    //iret = uv_tcp_keepalive(&server_, 1, 60);//               
    //if (iret) {
    //	errmsg_ = GetUVError(iret);
    //	return false;
    //}
    return true;
}

void TCPServer::close()
{
    for (auto it = clients_list_.begin(); it!=clients_list_.end(); ++it) {
        auto data = it->second;
        uv_close((uv_handle_t*)data->client_handle,AfterClientClose);
    }
    clients_list_.clear();

    LOGI("close server");
    if (isinit_) {
        uv_close((uv_handle_t*) &server_, AfterServerClose);
        LOGI("close server");
    }
    isinit_ = false;
    uv_mutex_destroy(&mutex_handle_);
}

bool TCPServer::run(int status)
{
    LOGI("server runing.");
    int iret = uv_run(loop_,(uv_run_mode)status);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    return true;
}
//    --         
bool TCPServer::setNoDelay(bool enable)
{
    int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    return true;
}

bool TCPServer::setKeepAlive(int enable, unsigned int delay)
{
    int iret = uv_tcp_keepalive(&server_, enable , delay);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    return true;
}

//  server    
bool TCPServer::bind(const char* ip, int port)
{
    struct sockaddr_in bind_addr;
    int iret = uv_ip4_addr(ip, port, &bind_addr);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    LOGI("server bind ip="<second->writebuffer.len < len) {
        itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base,len);
        itfind->second->writebuffer.len = len;
    }
    memcpy(itfind->second->writebuffer.base,data,len);
    uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base,len);
    int iret = uv_write(&itfind->second->write_req, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);
    if (iret) {
        errmsg_ = GetUVError(iret);
        LOGE(errmsg_);
        return false;
    }
    return true;
}

//   -      
void TCPServer::acceptConnection(uv_stream_t *server, int status)
{
    if (!server->data) {
        return;
    }
    TCPServer *tcpsock = (TCPServer *)server->data;
    int clientid = tcpsock->GetAvailaClientID();
    clientdata* cdata = new clientdata(clientid);//uv_close       
    cdata->tcp_server = tcpsock;//        
    int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//      
    if (iret) {
        delete cdata;
        tcpsock->errmsg_ = GetUVError(iret);
        LOGE(tcpsock->errmsg_);
        return;
    }
    iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*) cdata->client_handle);
    if ( iret) {
        tcpsock->errmsg_ = GetUVError(iret);
        uv_close((uv_handle_t*) cdata->client_handle, NULL);
        delete cdata;
        LOGE(tcpsock->errmsg_);
        return;
    }
    tcpsock->clients_list_.insert(std::make_pair(clientid,cdata));//       
    if (tcpsock->newconcb_) {
        tcpsock->newconcb_(clientid);
    }
    LOGI("new client("<client_handle<client_handle, onAllocBuffer, AfterServerRecv);//             
    return;
}

//   -        
void TCPServer::setrecvcb(int clientid, server_recvcb cb )
{
    auto itfind = clients_list_.find(clientid);
    if (itfind != clients_list_.end()) {
        itfind->second->recvcb_ = cb;
    }
}

//   -       
void TCPServer::setnewconnectcb(newconnect cb )
{
    newconcb_ = cb;
}

//         
void TCPServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
    if (!handle->data) {
        return;
    }
    clientdata *client = (clientdata*)handle->data;
    *buf = client->readbuffer;
}

void TCPServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
    if (!handle->data) {
        return;
    }
    clientdata *client = (clientdata*)handle->data;//    recv   clientdata
    if (nread < 0) {/* Error or EOF */
        TCPServer *server = (TCPServer *)client->tcp_server;
        if (nread == UV_EOF) {
            fprintf(stdout,"   (%d)    ,      
",client->client_id); LOGW(" ("<client_id<client_id); LOGW(" ("<client_id<client_id<DeleteClient(client->client_id);// , return; } else if (0 == nread) {/* Everything OK, but nothing read. */ } else if (client->recvcb_) { client->recvcb_(client->client_id,buf->base,nread); } } // void TCPServer::AfterSend(uv_write_t *req, int status) { if (status < 0) { LOGE(" :"<data; LOGI("client "<client_id<second->client_handle)) { uv_read_stop((uv_stream_t*)itfind->second->client_handle); } uv_close((uv_handle_t*)itfind->second->client_handle,AfterClientClose); clients_list_.erase(itfind); LOGI(" "<SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100); if (logpath) { zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath); } zsummer::log4z::ILog4zManager::GetInstance()->Start(); } /*****************************************TCP Client*************************************************************/ TCPClient::TCPClient(uv_loop_t* loop) :recvcb_(nullptr),userdata_(nullptr) ,connectstatus_(CONNECT_DIS) , isinit_(false) { readbuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE); writebuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE); loop_ = loop; connect_req_.data = this; write_req_.data = this; } TCPClient::~TCPClient() { free(readbuffer_.base); readbuffer_.base = nullptr; readbuffer_.len = 0; free(writebuffer_.base); writebuffer_.base = nullptr; writebuffer_.len = 0; close(); LOGI(" ("<connectip_.c_str(), pclient->connectport_, &bind_addr); if (iret) { pclient->errmsg_ = GetUVError(iret); LOGE(pclient->errmsg_); return; } iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect); if (iret) { pclient->errmsg_ = GetUVError(iret); LOGE(pclient->errmsg_); return; } fprintf(stdout,"client(%p) ConnectThread end, connect status %d
",pclient, pclient->connectstatus_); pclient->run(); } void TCPClient::ConnectThread6( void* arg ) { TCPClient *pclient = (TCPClient*)arg; LOGI(" ("<connectip_.c_str(), pclient->connectport_, &bind_addr); if (iret) { pclient->errmsg_ = GetUVError(iret); LOGE(pclient->errmsg_); return; } iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect); if (iret) { pclient->errmsg_ = GetUVError(iret); LOGE(pclient->errmsg_); return; } fprintf(stdout,"client(%p) ConnectThread end, connect status %d
",pclient, pclient->connectstatus_); LOGI(" ("<connectstatus_); pclient->run(); } void TCPClient::AfterConnect(uv_connect_t* handle, int status) { fprintf(stdout,"start after connect
"); TCPClient *pclient = (TCPClient*)handle->handle->data; if (status) { pclient->connectstatus_ = CONNECT_ERROR; fprintf(stdout,"connect error:%s
",GetUVError(status)); return; } int iret = uv_read_start(handle->handle, onAllocBuffer, AfterClientRecv);// if (iret) { fprintf(stdout,"uv_read_start error:%s
",GetUVError(iret)); pclient->connectstatus_ = CONNECT_ERROR; } else { pclient->connectstatus_ = CONNECT_FINISH; } LOGI(" ("<data) { return; } TCPClient *client = (TCPClient*)handle->data; *buf = client->readbuffer_; } void TCPClient::AfterClientRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf) { if (!handle->data) { return; } TCPClient *client = (TCPClient*)handle->data;// recv TCPClient if (nread < 0) { if (nread == UV_EOF) { fprintf(stdout," (%p)
",handle); LOGW(" "); } else if (nread == UV_ECONNRESET) { fprintf(stdout," (%p)
",handle); LOGW(" "); } else { fprintf(stdout," (%p) :%s
",handle,GetUVError(nread)); LOGW(" "< 0 && client->recvcb_) { client->recvcb_(buf->base,nread,client->userdata_); } } // void TCPClient::AfterSend(uv_write_t *req, int status) { TCPClient *client = (TCPClient *)req->handle->data; uv_mutex_unlock(&client->write_mutex_handle_); if (status < 0) { LOGE(" :"<SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG); zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100); if (logpath) { zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath); } zsummer::log4z::ILog4zManager::GetInstance()->Start(); } }

コードはgitにアップロードされました:  https://github.com/wqvbjhc/libuv_tcp
例として、クライアントはuv_を超える20回以上を同時に実行することができる.writeでassertエラーが発生し、原因が見つかりません
サーバは数十回の接続を受信できます.シミュレーション環境がないため、何百何千もテストされていません.