libuvベースのTCP設計
私はずっとプラットフォームにまたがるネットワークライブラリを探しています.boostとACEは膨大で、考えません.libevent,libev,libuvを比較した後、最終的にlibuv.libuvドキュメントが少なく、例も簡単で、tcpにはecho-serverの例しかありません.ネット上でもパッケージの例を探しましたが、以下のようになっています.
libsourceyライブラリは、多くのライブラリをカプセル化しています.libuvのパッケージは他のコードと結合しており、剥離しにくい.http://sourcey.com/libuv-cpp-wrappers/C++11パッケージの、残念ながらVS 10はC++11を完全にサポートしていないhttps://github.com/larroy/uvpp C++パッケージ https://github.com/keepallsimple/uvpp
私はraw tcp serverを実現したいと思っています.何万ものリンク数をサポートしています.ネット上で見つけたものは私に合わないので、各例を参照して自分でパッケージするしかありません.
コードはgitにアップロードされました: https://github.com/wqvbjhc/libuv_tcp
例として、クライアントはuv_を超える20回以上を同時に実行することができる.writeでassertエラーが発生し、原因が見つかりません
サーバは数十回の接続を受信できます.シミュレーション環境がないため、何百何千もテストされていません.
libsourceyライブラリは、多くのライブラリをカプセル化しています.libuvのパッケージは他のコードと結合しており、剥離しにくい.http://sourcey.com/libuv-cpp-wrappers/C++11パッケージの、残念ながらVS 10はC++11を完全にサポートしていないhttps://github.com/larroy/uvpp C++パッケージ https://github.com/keepallsimple/uvpp
私はraw tcp serverを実現したいと思っています.何万ものリンク数をサポートしています.ネット上で見つけたものは私に合わないので、各例を参照して自分でパッケージするしかありません.
/***************************************
* @file tcpsocket.h
* @brief libuv tcp , log4z
* @details
* @author phata, [email protected]
* @date 2014-5-13
* @mod 2014-5-13 phata .
,
****************************************/
#ifndef TCPSocket_H
#define TCPSocket_H
#include "uv.h"
#include
#include
#include
#include "tcpsocket.h"
#include "log4z.h"
std::string GetUVError(int retcode)
{
std::string err;
err = uv_err_name(retcode);
err +=":";
err += uv_strerror(retcode);
return std::move(err);
}
namespace uv
{
/*****************************************TCP Server*************************************************************/
TCPServer::TCPServer(uv_loop_t* loop)
:newconcb_(nullptr), isinit_(false)
{
loop_ = loop;
}
TCPServer::~TCPServer()
{
close();
LOGI("tcp server exit.");
}
// --
bool TCPServer::init()
{
if (isinit_) {
return true;
}
if (!loop_) {
errmsg_ = "loop is null on tcp init.";
LOGE(errmsg_);
return false;
}
int iret = uv_mutex_init(&mutex_handle_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_tcp_init(loop_,&server_);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
isinit_ = true;
server_.data = this;
//iret = uv_tcp_keepalive(&server_, 1, 60);//
//if (iret) {
// errmsg_ = GetUVError(iret);
// return false;
//}
return true;
}
void TCPServer::close()
{
for (auto it = clients_list_.begin(); it!=clients_list_.end(); ++it) {
auto data = it->second;
uv_close((uv_handle_t*)data->client_handle,AfterClientClose);
}
clients_list_.clear();
LOGI("close server");
if (isinit_) {
uv_close((uv_handle_t*) &server_, AfterServerClose);
LOGI("close server");
}
isinit_ = false;
uv_mutex_destroy(&mutex_handle_);
}
bool TCPServer::run(int status)
{
LOGI("server runing.");
int iret = uv_run(loop_,(uv_run_mode)status);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
// --
bool TCPServer::setNoDelay(bool enable)
{
int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
bool TCPServer::setKeepAlive(int enable, unsigned int delay)
{
int iret = uv_tcp_keepalive(&server_, enable , delay);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
// server
bool TCPServer::bind(const char* ip, int port)
{
struct sockaddr_in bind_addr;
int iret = uv_ip4_addr(ip, port, &bind_addr);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
LOGI("server bind ip="<second->writebuffer.len < len) {
itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base,len);
itfind->second->writebuffer.len = len;
}
memcpy(itfind->second->writebuffer.base,data,len);
uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base,len);
int iret = uv_write(&itfind->second->write_req, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);
if (iret) {
errmsg_ = GetUVError(iret);
LOGE(errmsg_);
return false;
}
return true;
}
// -
void TCPServer::acceptConnection(uv_stream_t *server, int status)
{
if (!server->data) {
return;
}
TCPServer *tcpsock = (TCPServer *)server->data;
int clientid = tcpsock->GetAvailaClientID();
clientdata* cdata = new clientdata(clientid);//uv_close
cdata->tcp_server = tcpsock;//
int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//
if (iret) {
delete cdata;
tcpsock->errmsg_ = GetUVError(iret);
LOGE(tcpsock->errmsg_);
return;
}
iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*) cdata->client_handle);
if ( iret) {
tcpsock->errmsg_ = GetUVError(iret);
uv_close((uv_handle_t*) cdata->client_handle, NULL);
delete cdata;
LOGE(tcpsock->errmsg_);
return;
}
tcpsock->clients_list_.insert(std::make_pair(clientid,cdata));//
if (tcpsock->newconcb_) {
tcpsock->newconcb_(clientid);
}
LOGI("new client("<client_handle<client_handle, onAllocBuffer, AfterServerRecv);//
return;
}
// -
void TCPServer::setrecvcb(int clientid, server_recvcb cb )
{
auto itfind = clients_list_.find(clientid);
if (itfind != clients_list_.end()) {
itfind->second->recvcb_ = cb;
}
}
// -
void TCPServer::setnewconnectcb(newconnect cb )
{
newconcb_ = cb;
}
//
void TCPServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
if (!handle->data) {
return;
}
clientdata *client = (clientdata*)handle->data;
*buf = client->readbuffer;
}
void TCPServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
if (!handle->data) {
return;
}
clientdata *client = (clientdata*)handle->data;// recv clientdata
if (nread < 0) {/* Error or EOF */
TCPServer *server = (TCPServer *)client->tcp_server;
if (nread == UV_EOF) {
fprintf(stdout," (%d) ,
",client->client_id);
LOGW(" ("<client_id<client_id);
LOGW(" ("<client_id<client_id<DeleteClient(client->client_id);// ,
return;
} else if (0 == nread) {/* Everything OK, but nothing read. */
} else if (client->recvcb_) {
client->recvcb_(client->client_id,buf->base,nread);
}
}
//
void TCPServer::AfterSend(uv_write_t *req, int status)
{
if (status < 0) {
LOGE(" :"<data;
LOGI("client "<client_id<second->client_handle)) {
uv_read_stop((uv_stream_t*)itfind->second->client_handle);
}
uv_close((uv_handle_t*)itfind->second->client_handle,AfterClientClose);
clients_list_.erase(itfind);
LOGI(" "<SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
if (logpath) {
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
}
zsummer::log4z::ILog4zManager::GetInstance()->Start();
}
/*****************************************TCP Client*************************************************************/
TCPClient::TCPClient(uv_loop_t* loop)
:recvcb_(nullptr),userdata_(nullptr)
,connectstatus_(CONNECT_DIS)
, isinit_(false)
{
readbuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
writebuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
loop_ = loop;
connect_req_.data = this;
write_req_.data = this;
}
TCPClient::~TCPClient()
{
free(readbuffer_.base);
readbuffer_.base = nullptr;
readbuffer_.len = 0;
free(writebuffer_.base);
writebuffer_.base = nullptr;
writebuffer_.len = 0;
close();
LOGI(" ("<connectip_.c_str(), pclient->connectport_, &bind_addr);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
fprintf(stdout,"client(%p) ConnectThread end, connect status %d
",pclient, pclient->connectstatus_);
pclient->run();
}
void TCPClient::ConnectThread6( void* arg )
{
TCPClient *pclient = (TCPClient*)arg;
LOGI(" ("<connectip_.c_str(), pclient->connectport_, &bind_addr);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
if (iret) {
pclient->errmsg_ = GetUVError(iret);
LOGE(pclient->errmsg_);
return;
}
fprintf(stdout,"client(%p) ConnectThread end, connect status %d
",pclient, pclient->connectstatus_);
LOGI(" ("<connectstatus_);
pclient->run();
}
void TCPClient::AfterConnect(uv_connect_t* handle, int status)
{
fprintf(stdout,"start after connect
");
TCPClient *pclient = (TCPClient*)handle->handle->data;
if (status) {
pclient->connectstatus_ = CONNECT_ERROR;
fprintf(stdout,"connect error:%s
",GetUVError(status));
return;
}
int iret = uv_read_start(handle->handle, onAllocBuffer, AfterClientRecv);//
if (iret) {
fprintf(stdout,"uv_read_start error:%s
",GetUVError(iret));
pclient->connectstatus_ = CONNECT_ERROR;
} else {
pclient->connectstatus_ = CONNECT_FINISH;
}
LOGI(" ("<data) {
return;
}
TCPClient *client = (TCPClient*)handle->data;
*buf = client->readbuffer_;
}
void TCPClient::AfterClientRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
if (!handle->data) {
return;
}
TCPClient *client = (TCPClient*)handle->data;// recv TCPClient
if (nread < 0) {
if (nread == UV_EOF) {
fprintf(stdout," (%p)
",handle);
LOGW(" ");
} else if (nread == UV_ECONNRESET) {
fprintf(stdout," (%p)
",handle);
LOGW(" ");
} else {
fprintf(stdout," (%p) :%s
",handle,GetUVError(nread));
LOGW(" "< 0 && client->recvcb_) {
client->recvcb_(buf->base,nread,client->userdata_);
}
}
//
void TCPClient::AfterSend(uv_write_t *req, int status)
{
TCPClient *client = (TCPClient *)req->handle->data;
uv_mutex_unlock(&client->write_mutex_handle_);
if (status < 0) {
LOGE(" :"<SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
if (logpath) {
zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
}
zsummer::log4z::ILog4zManager::GetInstance()->Start();
}
}
コードはgitにアップロードされました: https://github.com/wqvbjhc/libuv_tcp
例として、クライアントはuv_を超える20回以上を同時に実行することができる.writeでassertエラーが発生し、原因が見つかりません
サーバは数十回の接続を受信できます.シミュレーション環境がないため、何百何千もテストされていません.