ThreadシリーズのRAIDパッケージ
8659 ワード
c++でThreadをカプセル化し、スレッドプライベートデータTSD、スレッドプール:
実行結果:
test::constructor worker 1//単一スレッドの出力test::deconsturctor//以上スレッドプライベートデータスレッド終了時にworker 2//スレッドプールを解放する出力worker 3
ヘッダファイルhpp
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<boost/shared_ptr.hpp>
#include<boost/weak_ptr.hpp>
#include<boost/noncopyable.hpp>
#include<boost/function.hpp>
#include<boost/bind.hpp>
#include<boost/ptr_container/ptr_vector.hpp>
#include<string>
#include<deque>
#include<algorithm>
#include<sys/syscall.h>
#include<stdio.h>
#include<assert.h>
#include"Mutex.hpp"
using namespace std;
using namespace boost;
/*
* Thread
*/
__thread pid_t t_cacheTid=0;// ID ID
class Thread:noncopyable{
public:
typedef function<void()> ThreadFunc;//
explicit Thread(const ThreadFunc& a,const string& name=string()):started_(false),
joinded_(false),pthreadID_(0),tid_(new pid_t(0)),func_(a),name_(name){
}
~Thread(){
if(started_&&!joinded_){
pthread_detach(pthreadID_);//
}
}
void start();
/*
{
assert(!started_);
started_=true;
if(pthread_create(&pthreadID_,NULL,&startThread,NULL)){
started_=false;
abort();//
}
}
*///###1### http://cboard.cprogramming.com/cplusplus-programming/113981-passing-class-member-function-pthread_create.html
void join(){//
assert(started_);
assert(!joinded_);
joinded_=true;
pthread_join(pthreadID_,NULL);
}
pid_t tid() const{
if(t_cacheTid==0){// t_cacheTid ID ID
t_cacheTid=syscall(SYS_gettid);
}
return t_cacheTid;
}
const string& name() const{
return name_;
}
//void* startThread(void* arg){//###1###
void startThread(){
func_();
}
private:
bool started_;
bool joinded_;
pthread_t pthreadID_;
shared_ptr<pid_t> tid_;
ThreadFunc func_;
string name_;
};
void* threadFun(void* arg){//
Thread* thread=static_cast<Thread*>(arg);
thread->startThread();
}
void Thread::start(){
assert(!started_);
started_=true;
if(pthread_create(&pthreadID_,NULL,threadFun,this)){
started_=false;
abort();//
}
}
/*
* TSD
*/
template<typename T>
class ThreadLocal:noncopyable{
public:
ThreadLocal(){
pthread_key_create(&pkey_,&destructor);// pkey_ pthread_key_delete destructor
}
~ThreadLocal(){
pthread_key_delete(pkey_);// destructor
}
T& value(){// , singleton
T* perThreadValue=static_cast<T*>(pthread_getspecific(pkey_));
if(!perThreadValue){
T* newObj=new T();
pthread_setspecific(pkey_,newObj);
perThreadValue=newObj;
}
return *perThreadValue;
}
private:
static void destructor(void* x){//
T* obj=static_cast<T*>(x);
delete obj;
}
private:
pthread_key_t pkey_;
};
/*
*
*/
class ThreadPool:noncopyable{
public:
typedef function<void()> Task;//
explicit ThreadPool(const string& name=string()):mutex_(),cond_(mutex_),name_(name),running_(false){
}
~ThreadPool(){
if(running_){
stop();//
}
}
void start(int numThreads){
assert(threads_.empty());
running_=true;
threads_.reserve(numThreads);
for(int i=0;i<numThreads;i++){
threads_.push_back(new Thread(bind(&ThreadPool::runInThread,this)));// runInThread
threads_[i].start();
}
}
void stop(){
running_=false;// ,
cond_.notifyALL();//
for_each(threads_.begin(),threads_.end(),bind(&Thread::join,_1));//
}
void run(const Task& task){
if(running_){//###4###
if(threads_.empty()){//
task();
}
else{
MutexLockGuard guard(mutex_);// RAII mutex
queue_.push_back(task);
cond_.notify();
}
}
else{
printf("
");
}
}
bool running(){//
return running_;
}
private:
void runInThread(){//
while(running_){//###2###
Task task(take());
if(task){//task NULL
task();
}
}
}
Task take(){
MutexLockGuard guard(mutex_);
while(queue_.empty()&&running_){//###3### ###2### 。 , A ###2### , running_=false ,A ###3### , 。 ###4###
cond_.wait();//
}
Task task;
if(!queue_.empty()){
task=queue_.front();
queue_.pop_front();
}
return task;
}
Mutex mutex_;
Condition cond_;
string name_;
ptr_vector<Thread> threads_;//
deque<Task> queue_;
bool running_;
};
/*
*
*/
class test{
public:
test(){
printf("test::constructor
");
}
~test(){
printf("test::deconsturctor
");
}
};
void worker1(){
ThreadLocal<test> one;
test two=one.value();//
printf("worker1
");
}
void worke2(){
printf("worker2
");
}
void worker3(){
printf("worker3
");
sleep(1);
}
int main(){
Thread one(worker1);
one.start();
one.join();
ThreadPool two;
two.start(2);
two.run(worke2);
two.run(worker3);
two.stop();
return 0;
}
実行結果:
test::constructor worker 1//単一スレッドの出力test::deconsturctor//以上スレッドプライベートデータスレッド終了時にworker 2//スレッドプールを解放する出力worker 3
ヘッダファイルhpp
#include<iostream>
#include<string>
#include<unistd.h>
#include<pthread.h>
#include<assert.h>
#include<boost/utility.hpp>
using namespace std;
using namespace boost;
/*
*
*/
class Mutex:noncopyable{
public:
Mutex(){
pthread_mutex_init(&mutex,NULL);
}
void lock(){
pthread_mutex_lock(&mutex);
}
void unlock(){
pthread_mutex_unlock(&mutex);
}
pthread_mutex_t& get(){
return mutex;
}
private:
pthread_mutex_t mutex;
};
/*
* RAII
*/
class MutexLockGuard:noncopyable{
public:
explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){
mutex_.lock();
}
~MutexLockGuard(){
mutex_.unlock();
}
private:
Mutex& mutex_;
};
/*
*
*/
class Condition:noncopyable{
public:
explicit Condition(Mutex& mutex):mutex_(mutex){
pthread_cond_init(&pcond_,NULL);
}
~Condition(){
pthread_cond_destroy(&pcond_);
}
void wait(){
pthread_cond_wait(&pcond_,&mutex_.get());
}
void notify(){
pthread_cond_signal(&pcond_);
}
void notifyALL(){
pthread_cond_broadcast(&pcond_);
}
private:
Mutex& mutex_;
pthread_cond_t pcond_;
};
/*
*
*/
class CountDownLatch{
public:
CountDownLatch(int count):mutex_(),condition_(mutex_),count_(count){}
void wait(){
MutexLockGuard lock(mutex_);
while(count_>0)
condition_.wait();
}
void countDown(){
MutexLockGuard lock(mutex_);
--count_;
if(count_==0)
condition_.notifyALL();
}
private:
mutable Mutex mutex_;
Condition condition_;
int count_;
};