Linux単純高同時モデル-Epoll+スレッドプール

10961 ワード

まずlocker.hのファイルは,信号量,反発量,条件変数をカプセル化した.
オンライン・スレッド・プールのタスク・キューには反発量の保護が必要です.タスク・キューにタスクが到着した場合、pthread_を待機するように起動する必要があります.cond_wait()のスレッドは、スレッドプールが停止したときに起動する必要があります.pthread_が呼び出されます.cond_broadcast().
locker.hファイル:
#ifndef _LOCKER_H_
#define _LOCKER_H_

#include 
#include 
#include 

/*     */
class sem_locker
{
private:
    sem_t m_sem;

public:
    //      
    sem_locker()
    {
	if(sem_init(&m_sem, 0, 0) != 0)
	    printf("sem init error
"); } // ~sem_locker() { sem_destroy(&m_sem); } // bool wait() { return sem_wait(&m_sem) == 0; } // bool add() { return sem_post(&m_sem) == 0; } }; /* locker*/ class mutex_locker { private: pthread_mutex_t m_mutex; public: mutex_locker() { if(pthread_mutex_init(&m_mutex, NULL) != 0) printf("mutex init error!"); } ~mutex_locker() { pthread_mutex_destroy(&m_mutex); } bool mutex_lock() //lock mutex { return pthread_mutex_lock(&m_mutex) == 0; } bool mutex_unlock() //unlock { return pthread_mutex_unlock(&m_mutex) == 0; } }; /* locker*/ class cond_locker { private: pthread_mutex_t m_mutex; pthread_cond_t m_cond; public: // m_mutex and m_cond cond_locker() { if(pthread_mutex_init(&m_mutex, NULL) != 0) printf("mutex init error"); if(pthread_cond_init(&m_cond, NULL) != 0) { // , mutex pthread_mutex_destroy(&m_mutex); printf("cond init error"); } } // destroy mutex and cond ~cond_locker() { pthread_mutex_destroy(&m_mutex); pthread_cond_destroy(&m_cond); } // bool wait() { int ans = 0; pthread_mutex_lock(&m_mutex); ans = pthread_cond_wait(&m_cond, &m_mutex); pthread_mutex_unlock(&m_mutex); return ans == 0; } // bool signal() { return pthread_cond_signal(&m_cond) == 0; } // all bool broadcast() { return pthread_cond_broadcast(&m_cond) == 0; } }; #endif

thread_pool.hファイル.
threadnumスレッドを作成し、pthread_を呼び出します.detach()はスレッドを分離し,スレッドが終了し,リソースを自動的に回収する.(前のブログのスレッドプールにバグがあり、不完全で、スレッドプールが終了した場合、すべてのスレッドを正常に終了させることはできません)
#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_

#include "locker.h"
#include 
#include 
#include 
#include 
#include 
#include 

template
class threadpool
{
private:
    int thread_number;  //       
    //int max_task_number;  //           
    pthread_t *all_threads;   //    
    std::queue task_queue; //    
    mutex_locker queue_mutex_locker;  //   
    //sem_locker queue_sem_locker;   //   
    cond_locker queue_cond_locker; //cond
    bool is_stop; //      
public:
    threadpool(int thread_num = 20);
    ~threadpool();
    bool append_task(T *task);  //    
    void start();              //     
    void stop();               //     
private:
    //       。  run()  
    static void *worker(void *arg);
    void run();
    T *getTask();   //    
};

template 
threadpool::threadpool(int thread_num):
	thread_number(thread_num),is_stop(false), all_threads(NULL)
{       //    
    if(thread_num <= 0)
	printf("threadpool can't init because thread_number = 0");

    all_threads = new pthread_t[thread_number];
    if(all_threads == NULL)
    	printf("can't init threadpool because thread array can't new");
}

template 
threadpool::~threadpool()
{
    delete []all_threads;
    stop();
}

template 
void threadpool::stop() //     
{
        is_stop = true;
        //queue_sem_locker.add();
        queue_cond_locker.broadcast();
}

template 
void threadpool::start()  //     
{
    for(int i = 0; i < thread_number; ++i)
    {
	//printf("create the %dth pthread
", i); if(pthread_create(all_threads + i, NULL, worker, this) != 0) {// , delete []all_threads; throw std::exception(); } if(pthread_detach(all_threads[i])) {// , delete []all_threads; throw std::exception(); } } } // template bool threadpool::append_task(T *task) // { // queue_mutex_locker.mutex_lock(); bool is_signal = task_queue.empty(); // task_queue.push(task); queue_mutex_locker.mutex_unlock(); // if(is_signal) { queue_cond_locker.signal(); } return true; } template void *threadpool::worker(void *arg) // { threadpool *pool = (threadpool *)arg; pool->run(); return pool; } template T* threadpool::getTask() // { T *task = NULL; queue_mutex_locker.mutex_lock(); if(!task_queue.empty()) { task = task_queue.front(); task_queue.pop(); } queue_mutex_locker.mutex_unlock(); return task; } template void threadpool::run() { while(!is_stop){ T *task = getTask(); if(task == NULL) // , queue_cond_locker.wait(); else // task->doit(); } //for test //printf("exit%d
", (unsigned long)pthread_self()); } #endif

epollをカプセル化しました.
EpollServer.hの中のBaseTask.hとTask.hは別のファイルに置くべきです.ここは便利ですね.はは.
#ifndef _EPOLL_SERVER_H_
#define _EPOLL_SERVER_H_

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
//#include 

#include "thread_pool.h"

#define MAX_EVENT 1024   //epoll_events     
#define MAX_BUFFER 2048  //Buffer     

class BaseTask
{
public:
	virtual void doit() = 0;
};

class Task : public BaseTask
{
private:
	int sockfd;
	char order[MAX_BUFFER];
public:
	Task(char *str, int fd) : sockfd(fd)
	{
		memset(order, '\0', MAX_BUFFER);
		strcpy(order, str);
	}
	void doit()  //       
	{
		//do something of the order
		//printf("%s
", order); snprintf(order, MAX_BUFFER - 1, "somedata
"); write(sockfd, order, strlen(order)); } }; class EpollServer { private: bool is_stop; // epoll_wait int threadnum; // int sockfd; // fd int port; // int epollfd; //Epoll fd threadpool *pool; // //char address[20]; epoll_event events[MAX_EVENT]; //epoll events struct sockaddr_in bindAddr; // sockaddr public:// EpollServer() {} EpollServer(int ports, int thread) : is_stop(false) , threadnum(thread) , port(ports), pool(NULL) { } ~EpollServer() // { delete pool; } void init(); void epoll(); static int setnonblocking(int fd) // fd { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } static void addfd(int epollfd, int sockfd, bool oneshot) // Epoll fd {//oneshot , fd, , false epoll_event event; event.data.fd = sockfd; event.events = EPOLLIN | EPOLLET; if(oneshot) { event.events |= EPOLLONESHOT; } epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event); // fd EpollServer::setnonblocking(sockfd); } }; void EpollServer::init() //EpollServer { bzero(&bindAddr, sizeof(bindAddr)); bindAddr.sin_family = AF_INET; bindAddr.sin_port = htons(port); bindAddr.sin_addr.s_addr = htonl(INADDR_ANY); // Socket sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd < 0) { printf("EpollServer socket init error
"); return; } int ret = bind(sockfd, (struct sockaddr *)&bindAddr, sizeof(bindAddr)); if(ret < 0) { printf("EpollServer bind init error
"); return; } ret = listen(sockfd, 10); if(ret < 0) { printf("EpollServer listen init error
"); return; } //create Epoll epollfd = epoll_create(1024); if(epollfd < 0) { printf("EpollServer epoll_create init error
"); return; } pool = new threadpool(threadnum); // } void EpollServer::epoll() { pool->start(); // // addfd(epollfd, sockfd, false); while(!is_stop) {// epoll_wait int ret = epoll_wait(epollfd, events, MAX_EVENT, -1); if(ret < 0) // { printf("epoll_wait error
"); break; } for(int i = 0; i < ret; ++i) { int fd = events[i].data.fd; if(fd == sockfd) // { struct sockaddr_in clientAddr; socklen_t len = sizeof(clientAddr); int confd = accept(sockfd, (struct sockaddr *) &clientAddr, &len); EpollServer::addfd(epollfd, confd, false); } else if(events[i].events & EPOLLIN) // fd { char buffer[MAX_BUFFER]; readagain: memset(buffer, 0, sizeof(buffer)); int ret = read(fd, buffer, MAX_BUFFER - 1); if(ret == 0) // fd , Epoll fd { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); shutdown(fd, SHUT_RDWR); printf("%d logout
", fd); continue; } else if(ret < 0)// , { if(errno == EAGAIN) { printf("read error! read again
"); goto readagain; break; } } else// , { BaseTask *task = new Task(buffer, fd); pool->append_task(task); } } else { printf("something else had happened
"); } } } close(sockfd);// 。 pool->stop(); } #endif

次は簡単なデモのテストです.
#include "EpollServer.h"

int main(int argc, char const *argv[])
{
	if(argc != 3)
	{
		printf("usage %s port threadnum
", argv[0]); return -1; } int port = atoi(argv[1]); if(port == 0) { printf("port must be Integer
"); return -1; } int threadnum = atoi(argv[2]); if(port == 0) { printf("threadnum must be Integer
"); return -1; } EpollServer *epoll = new EpollServer(port, threadnum); epoll->init(); epoll->epoll(); return 0; }

コードはUbuntuでコンパイルされます.次に、コンカレント量をサポートできる量を更新します.
-------------------------------------------------------------------------------------------------