[c++]スレッドプールの簡単な例(with barrier)

5433 ワード

简単に1版のスレッドプールを実现して、epollをいじって慌ただしく书くため、、、、、もし何か问题があるならばも面倒に指摘して、、、ありがとうございます?
 
addTaskでタスクをコミットします.タスクはWorkerを継承しvoid run()を実装する必要があります.
// MyThreadPool/MyThreadPool.h                                                                                                                                                                    
#ifndef __MY_THREAD_POOL_H__
#define __MY_THREAD_POOL_H__
#include 
#include 
#include 
#include 
#include 
using namespace std;

class Worker;
class MyThread;
class MyThreadPool;

class Worker
{
    public:
        Worker(){};
        virtual ~Worker(){printf("[ERROR] error ~Worker()
");}; virtual void run(){printf("[ERROR] error run()
");}; }; class MyThread { public: MyThreadPool* pool; pthread_t* m_thread; MyThread():pool(){}; ~MyThread(){}; bool init(); bool join(); static void* run(void* ptr); }; class MyThreadPool { public: vector m_threads; vector task_vec; bool stopFlag; pthread_attr_t m_attr; pthread_mutex_t m_lock; pthread_barrier_t m_barr; bool open(const int& num); bool join(); bool getTask(Worker*& task); bool addTask(Worker* task); }; #endif
// MyThreadPool/MyThreadPool.cpp                                                                                                                                                                  
#include "MyThreadPool.h"
bool MyThread::init()
{
    m_thread = (pthread_t*)malloc(sizeof(pthread_t)); //    
    pthread_create(m_thread, &(pool->m_attr), MyThread::run, pool );
    return true;
}
bool MyThread::join()
{
    pthread_join(*m_thread,NULL);
    if(m_thread != NULL) free(m_thread);
    return true;
}
void* MyThread::run(void* ptr)
{
    MyThreadPool* pool = (MyThreadPool*)ptr;
    pthread_barrier_wait(&(pool->m_barr));
//  printf("start run!
"); Worker* task; while(1) { if( pool->stopFlag && pool->task_vec.empty() ) break; // printf("try:get
"); if(pool->getTask(task)) { // printf("get this task :%x
", task); task->run(); } else usleep(100000); } return NULL; } bool MyThreadPool::open(const int& num) { pthread_attr_init(&m_attr); pthread_mutex_init(&m_lock, NULL); pthread_barrier_init(&m_barr, NULL, num+1); // pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE);// joinable pthread_attr_setstacksize(&m_attr, 1024000); // ,b stopFlag=false; m_threads.reserve(num); for(int i = 0; i < num; ++i) { MyThread* p = new MyThread(); p->pool = this; p->init(); m_threads.push_back(p); // printf("open one thread : %x
", p); } pthread_barrier_wait(&m_barr); pthread_attr_destroy(&m_attr); return true; } bool MyThreadPool::join() { stopFlag=true; for(int i = 0; i < m_threads.size(); ++i) { m_threads[i]->join(); // printf("join one thread : %x
", m_threads[i]); delete m_threads[i]; m_threads[i] = NULL; } pthread_mutex_destroy(&m_lock); pthread_barrier_destroy(&m_barr); return true; } bool MyThreadPool::getTask(Worker*& task) { // printf("getTask
"); pthread_mutex_lock(&m_lock); if(!task_vec.empty()) { task = *task_vec.begin(); task_vec.erase(task_vec.begin()); } else task = NULL; pthread_mutex_unlock(&m_lock); // printf("get %x, size:%d
",task, task_vec.size()); return task==NULL ? false : true; } bool MyThreadPool::addTask(Worker* task) { pthread_mutex_lock(&m_lock); task_vec.push_back(task); pthread_mutex_unlock(&m_lock); // printf("add %x, size:%d
",task, task_vec.size()); return true; }
// main/main.cpp                                                                                                                                                                                  
#include 
#include 
#include "MyThreadPool/MyThreadPool.h"
using namespace std;
class A : public Worker
{
    public:
        int a,b;
        int ret;
        A(){};
        virtual ~A(){};
        virtual void run(){
            //printf("A.run() start, %d %d
", a,b); printf("curr_task : %x | %d %d
", this,a,b); ret = a + b; sleep(1); //printf("A.run() finish, %d
", ret); }; }; int main(int argc,char **argv) { MyThreadPool pool; pool.open(3); for(int i = 0 ; i < 8 ;i ++) { A *a = new A(); a->a = i; a->b = 10; pool.addTask(a); //printf("add task!, %d
", i); } pool.join(); return 0; }

その後、個1を実現する.チェーンテーブル置換vector 2.タスクが終了するかどうかを問い合わせるインタフェースと、タスクを直接終了するインタフェースを提供します.
おや、、また穴を掘って、、、