[c++]スレッドプールの簡単な例(with barrier)
5433 ワード
简単に1版のスレッドプールを実现して、epollをいじって慌ただしく书くため、、、、、もし何か问题があるならばも面倒に指摘して、、、ありがとうございます?
addTaskでタスクをコミットします.タスクはWorkerを継承しvoid run()を実装する必要があります.
その後、個1を実現する.チェーンテーブル置換vector 2.タスクが終了するかどうかを問い合わせるインタフェースと、タスクを直接終了するインタフェースを提供します.
おや、、また穴を掘って、、、
addTaskでタスクをコミットします.タスクはWorkerを継承しvoid run()を実装する必要があります.
// MyThreadPool/MyThreadPool.h
#ifndef __MY_THREAD_POOL_H__
#define __MY_THREAD_POOL_H__
#include
#include
#include
#include
#include
using namespace std;
class Worker;
class MyThread;
class MyThreadPool;
class Worker
{
public:
Worker(){};
virtual ~Worker(){printf("[ERROR] error ~Worker()
");};
virtual void run(){printf("[ERROR] error run()
");};
};
class MyThread
{
public:
MyThreadPool* pool;
pthread_t* m_thread;
MyThread():pool(){};
~MyThread(){};
bool init();
bool join();
static void* run(void* ptr);
};
class MyThreadPool
{
public:
vector m_threads;
vector task_vec;
bool stopFlag;
pthread_attr_t m_attr;
pthread_mutex_t m_lock;
pthread_barrier_t m_barr;
bool open(const int& num);
bool join();
bool getTask(Worker*& task);
bool addTask(Worker* task);
};
#endif
// MyThreadPool/MyThreadPool.cpp
#include "MyThreadPool.h"
bool MyThread::init()
{
m_thread = (pthread_t*)malloc(sizeof(pthread_t)); //
pthread_create(m_thread, &(pool->m_attr), MyThread::run, pool );
return true;
}
bool MyThread::join()
{
pthread_join(*m_thread,NULL);
if(m_thread != NULL) free(m_thread);
return true;
}
void* MyThread::run(void* ptr)
{
MyThreadPool* pool = (MyThreadPool*)ptr;
pthread_barrier_wait(&(pool->m_barr));
// printf("start run!
");
Worker* task;
while(1)
{
if( pool->stopFlag && pool->task_vec.empty() )
break;
// printf("try:get
");
if(pool->getTask(task))
{
// printf("get this task :%x
", task);
task->run();
}
else usleep(100000);
}
return NULL;
}
bool MyThreadPool::open(const int& num)
{
pthread_attr_init(&m_attr);
pthread_mutex_init(&m_lock, NULL);
pthread_barrier_init(&m_barr, NULL, num+1); //
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE);// joinable
pthread_attr_setstacksize(&m_attr, 1024000); // ,b
stopFlag=false;
m_threads.reserve(num);
for(int i = 0; i < num; ++i)
{
MyThread* p = new MyThread();
p->pool = this;
p->init();
m_threads.push_back(p);
// printf("open one thread : %x
", p);
}
pthread_barrier_wait(&m_barr);
pthread_attr_destroy(&m_attr);
return true;
}
bool MyThreadPool::join()
{
stopFlag=true;
for(int i = 0; i < m_threads.size(); ++i)
{
m_threads[i]->join();
// printf("join one thread : %x
", m_threads[i]);
delete m_threads[i];
m_threads[i] = NULL;
}
pthread_mutex_destroy(&m_lock);
pthread_barrier_destroy(&m_barr);
return true;
}
bool MyThreadPool::getTask(Worker*& task)
{
// printf("getTask
");
pthread_mutex_lock(&m_lock);
if(!task_vec.empty())
{
task = *task_vec.begin();
task_vec.erase(task_vec.begin());
}
else
task = NULL;
pthread_mutex_unlock(&m_lock);
// printf("get %x, size:%d
",task, task_vec.size());
return task==NULL ? false : true;
}
bool MyThreadPool::addTask(Worker* task)
{
pthread_mutex_lock(&m_lock);
task_vec.push_back(task);
pthread_mutex_unlock(&m_lock);
// printf("add %x, size:%d
",task, task_vec.size());
return true;
}
// main/main.cpp
#include
#include
#include "MyThreadPool/MyThreadPool.h"
using namespace std;
class A : public Worker
{
public:
int a,b;
int ret;
A(){};
virtual ~A(){};
virtual void run(){
//printf("A.run() start, %d %d
", a,b);
printf("curr_task : %x | %d %d
", this,a,b);
ret = a + b;
sleep(1);
//printf("A.run() finish, %d
", ret);
};
};
int main(int argc,char **argv)
{
MyThreadPool pool;
pool.open(3);
for(int i = 0 ; i < 8 ;i ++)
{
A *a = new A();
a->a = i;
a->b = 10;
pool.addTask(a);
//printf("add task!, %d
", i);
}
pool.join();
return 0;
}
その後、個1を実現する.チェーンテーブル置換vector 2.タスクが終了するかどうかを問い合わせるインタフェースと、タスクを直接終了するインタフェースを提供します.
おや、、また穴を掘って、、、