Linux単純高同時モデル-Epoll+スレッドプール
まずlocker.hのファイルは,信号量,反発量,条件変数をカプセル化した.
オンライン・スレッド・プールのタスク・キューには反発量の保護が必要です.タスク・キューにタスクが到着した場合、pthread_を待機するように起動する必要があります.cond_wait()のスレッドは、スレッドプールが停止したときに起動する必要があります.pthread_が呼び出されます.cond_broadcast().
locker.hファイル:
thread_pool.hファイル.
threadnumスレッドを作成し、pthread_を呼び出します.detach()はスレッドを分離し,スレッドが終了し,リソースを自動的に回収する.(前のブログのスレッドプールにバグがあり、不完全で、スレッドプールが終了した場合、すべてのスレッドを正常に終了させることはできません)
epollをカプセル化しました.
EpollServer.hの中のBaseTask.hとTask.hは別のファイルに置くべきです.ここは便利ですね.はは.
次は簡単なデモのテストです.
コードはUbuntuでコンパイルされます.次に、コンカレント量をサポートできる量を更新します.
-------------------------------------------------------------------------------------------------
オンライン・スレッド・プールのタスク・キューには反発量の保護が必要です.タスク・キューにタスクが到着した場合、pthread_を待機するように起動する必要があります.cond_wait()のスレッドは、スレッドプールが停止したときに起動する必要があります.pthread_が呼び出されます.cond_broadcast().
locker.hファイル:
#ifndef _LOCKER_H_
#define _LOCKER_H_
#include
#include
#include
/* */
class sem_locker
{
private:
sem_t m_sem;
public:
//
sem_locker()
{
if(sem_init(&m_sem, 0, 0) != 0)
printf("sem init error
");
}
//
~sem_locker()
{
sem_destroy(&m_sem);
}
//
bool wait()
{
return sem_wait(&m_sem) == 0;
}
//
bool add()
{
return sem_post(&m_sem) == 0;
}
};
/* locker*/
class mutex_locker
{
private:
pthread_mutex_t m_mutex;
public:
mutex_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error!");
}
~mutex_locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool mutex_lock() //lock mutex
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool mutex_unlock() //unlock
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
};
/* locker*/
class cond_locker
{
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
public:
// m_mutex and m_cond
cond_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error");
if(pthread_cond_init(&m_cond, NULL) != 0)
{ // , mutex
pthread_mutex_destroy(&m_mutex);
printf("cond init error");
}
}
// destroy mutex and cond
~cond_locker()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
//
bool wait()
{
int ans = 0;
pthread_mutex_lock(&m_mutex);
ans = pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
return ans == 0;
}
//
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
// all
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
};
#endif
thread_pool.hファイル.
threadnumスレッドを作成し、pthread_を呼び出します.detach()はスレッドを分離し,スレッドが終了し,リソースを自動的に回収する.(前のブログのスレッドプールにバグがあり、不完全で、スレッドプールが終了した場合、すべてのスレッドを正常に終了させることはできません)
#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
#include "locker.h"
#include
#include
#include
#include
#include
#include
template
class threadpool
{
private:
int thread_number; //
//int max_task_number; //
pthread_t *all_threads; //
std::queue task_queue; //
mutex_locker queue_mutex_locker; //
//sem_locker queue_sem_locker; //
cond_locker queue_cond_locker; //cond
bool is_stop; //
public:
threadpool(int thread_num = 20);
~threadpool();
bool append_task(T *task); //
void start(); //
void stop(); //
private:
// 。 run()
static void *worker(void *arg);
void run();
T *getTask(); //
};
template
threadpool::threadpool(int thread_num):
thread_number(thread_num),is_stop(false), all_threads(NULL)
{ //
if(thread_num <= 0)
printf("threadpool can't init because thread_number = 0");
all_threads = new pthread_t[thread_number];
if(all_threads == NULL)
printf("can't init threadpool because thread array can't new");
}
template
threadpool::~threadpool()
{
delete []all_threads;
stop();
}
template
void threadpool::stop() //
{
is_stop = true;
//queue_sem_locker.add();
queue_cond_locker.broadcast();
}
template
void threadpool::start() //
{
for(int i = 0; i < thread_number; ++i)
{
//printf("create the %dth pthread
", i);
if(pthread_create(all_threads + i, NULL, worker, this) != 0)
{// ,
delete []all_threads;
throw std::exception();
}
if(pthread_detach(all_threads[i]))
{// ,
delete []all_threads;
throw std::exception();
}
}
}
//
template
bool threadpool::append_task(T *task) //
{ //
queue_mutex_locker.mutex_lock();
bool is_signal = task_queue.empty();
//
task_queue.push(task);
queue_mutex_locker.mutex_unlock();
//
if(is_signal)
{
queue_cond_locker.signal();
}
return true;
}
template
void *threadpool::worker(void *arg) //
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template
T* threadpool::getTask() //
{
T *task = NULL;
queue_mutex_locker.mutex_lock();
if(!task_queue.empty())
{
task = task_queue.front();
task_queue.pop();
}
queue_mutex_locker.mutex_unlock();
return task;
}
template
void threadpool::run()
{
while(!is_stop){
T *task = getTask();
if(task == NULL) // ,
queue_cond_locker.wait();
else //
task->doit();
}
//for test
//printf("exit%d
", (unsigned long)pthread_self());
}
#endif
epollをカプセル化しました.
EpollServer.hの中のBaseTask.hとTask.hは別のファイルに置くべきです.ここは便利ですね.はは.
#ifndef _EPOLL_SERVER_H_
#define _EPOLL_SERVER_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
//#include
#include "thread_pool.h"
#define MAX_EVENT 1024 //epoll_events
#define MAX_BUFFER 2048 //Buffer
class BaseTask
{
public:
virtual void doit() = 0;
};
class Task : public BaseTask
{
private:
int sockfd;
char order[MAX_BUFFER];
public:
Task(char *str, int fd) : sockfd(fd)
{
memset(order, '\0', MAX_BUFFER);
strcpy(order, str);
}
void doit() //
{
//do something of the order
//printf("%s
", order);
snprintf(order, MAX_BUFFER - 1, "somedata
");
write(sockfd, order, strlen(order));
}
};
class EpollServer
{
private:
bool is_stop; // epoll_wait
int threadnum; //
int sockfd; // fd
int port; //
int epollfd; //Epoll fd
threadpool *pool; //
//char address[20];
epoll_event events[MAX_EVENT]; //epoll events
struct sockaddr_in bindAddr; // sockaddr
public://
EpollServer()
{}
EpollServer(int ports, int thread) : is_stop(false) , threadnum(thread) ,
port(ports), pool(NULL)
{
}
~EpollServer() //
{
delete pool;
}
void init();
void epoll();
static int setnonblocking(int fd) // fd
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
static void addfd(int epollfd, int sockfd, bool oneshot) // Epoll fd
{//oneshot , fd, , false
epoll_event event;
event.data.fd = sockfd;
event.events = EPOLLIN | EPOLLET;
if(oneshot)
{
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event); // fd
EpollServer::setnonblocking(sockfd);
}
};
void EpollServer::init() //EpollServer
{
bzero(&bindAddr, sizeof(bindAddr));
bindAddr.sin_family = AF_INET;
bindAddr.sin_port = htons(port);
bindAddr.sin_addr.s_addr = htonl(INADDR_ANY);
// Socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
printf("EpollServer socket init error
");
return;
}
int ret = bind(sockfd, (struct sockaddr *)&bindAddr, sizeof(bindAddr));
if(ret < 0)
{
printf("EpollServer bind init error
");
return;
}
ret = listen(sockfd, 10);
if(ret < 0)
{
printf("EpollServer listen init error
");
return;
}
//create Epoll
epollfd = epoll_create(1024);
if(epollfd < 0)
{
printf("EpollServer epoll_create init error
");
return;
}
pool = new threadpool(threadnum); //
}
void EpollServer::epoll()
{
pool->start(); //
//
addfd(epollfd, sockfd, false);
while(!is_stop)
{// epoll_wait
int ret = epoll_wait(epollfd, events, MAX_EVENT, -1);
if(ret < 0) //
{
printf("epoll_wait error
");
break;
}
for(int i = 0; i < ret; ++i)
{
int fd = events[i].data.fd;
if(fd == sockfd) //
{
struct sockaddr_in clientAddr;
socklen_t len = sizeof(clientAddr);
int confd = accept(sockfd, (struct sockaddr *)
&clientAddr, &len);
EpollServer::addfd(epollfd, confd, false);
}
else if(events[i].events & EPOLLIN) // fd
{
char buffer[MAX_BUFFER];
readagain: memset(buffer, 0, sizeof(buffer));
int ret = read(fd, buffer, MAX_BUFFER - 1);
if(ret == 0) // fd , Epoll fd
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
shutdown(fd, SHUT_RDWR);
printf("%d logout
", fd);
continue;
}
else if(ret < 0)// ,
{
if(errno == EAGAIN)
{
printf("read error! read again
");
goto readagain;
break;
}
}
else// ,
{
BaseTask *task = new Task(buffer, fd);
pool->append_task(task);
}
}
else
{
printf("something else had happened
");
}
}
}
close(sockfd);// 。
pool->stop();
}
#endif
次は簡単なデモのテストです.
#include "EpollServer.h"
int main(int argc, char const *argv[])
{
if(argc != 3)
{
printf("usage %s port threadnum
", argv[0]);
return -1;
}
int port = atoi(argv[1]);
if(port == 0)
{
printf("port must be Integer
");
return -1;
}
int threadnum = atoi(argv[2]);
if(port == 0)
{
printf("threadnum must be Integer
");
return -1;
}
EpollServer *epoll = new EpollServer(port, threadnum);
epoll->init();
epoll->epoll();
return 0;
}
コードはUbuntuでコンパイルされます.次に、コンカレント量をサポートできる量を更新します.
-------------------------------------------------------------------------------------------------