ThreadシリーズのRAIDパッケージ


c++でThreadをカプセル化し、スレッドプライベートデータTSD、スレッドプール:
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<boost/shared_ptr.hpp>
#include<boost/weak_ptr.hpp>
#include<boost/noncopyable.hpp>
#include<boost/function.hpp>
#include<boost/bind.hpp>
#include<boost/ptr_container/ptr_vector.hpp>
#include<string>
#include<deque>
#include<algorithm>
#include<sys/syscall.h>
#include<stdio.h>
#include<assert.h>
#include"Mutex.hpp"
using namespace std;
using namespace boost;
/*
 *   Thread
 */
__thread pid_t t_cacheTid=0;//        ID          ID
class Thread:noncopyable{
    public:
        typedef function<void()> ThreadFunc;//          
        explicit Thread(const ThreadFunc& a,const string& name=string()):started_(false),
            joinded_(false),pthreadID_(0),tid_(new pid_t(0)),func_(a),name_(name){
            }
        ~Thread(){
            if(started_&&!joinded_){
                pthread_detach(pthreadID_);//    
            }
        }
        void start();
        /*
        {
            assert(!started_);
            started_=true;
            if(pthread_create(&pthreadID_,NULL,&startThread,NULL)){
                started_=false;
                abort();//         
            }
        }
        *///###1###         http://cboard.cprogramming.com/cplusplus-programming/113981-passing-class-member-function-pthread_create.html
        void join(){//           
            assert(started_);
            assert(!joinded_);
            joinded_=true;
            pthread_join(pthreadID_,NULL);
        }
        pid_t tid() const{
            if(t_cacheTid==0){//      t_cacheTid     ID              ID      
                t_cacheTid=syscall(SYS_gettid);
            }
            return t_cacheTid;
        }
        const string& name() const{
            return name_;
        }
        //void* startThread(void* arg){//###1###
        void startThread(){
            func_();
        }
    private:
        bool started_;
        bool joinded_;
        pthread_t pthreadID_;
        shared_ptr<pid_t> tid_;
        ThreadFunc func_;
        string name_;
};
void* threadFun(void* arg){//           
    Thread* thread=static_cast<Thread*>(arg);
    thread->startThread();
}
void Thread::start(){
    assert(!started_);
    started_=true;
    if(pthread_create(&pthreadID_,NULL,threadFun,this)){
        started_=false;
        abort();//         
    }
}

/*
 *       TSD
 */
template<typename T>
class ThreadLocal:noncopyable{
    public:
        ThreadLocal(){
            pthread_key_create(&pkey_,&destructor);//          pkey_  pthread_key_delete  destructor  
        }
        ~ThreadLocal(){
            pthread_key_delete(pkey_);//  destructor  
        }
        T& value(){//      ,                   singleton  
            T* perThreadValue=static_cast<T*>(pthread_getspecific(pkey_));
            if(!perThreadValue){
                T* newObj=new T();
                pthread_setspecific(pkey_,newObj);
                perThreadValue=newObj;
            }
            return *perThreadValue;
        }
    private:
        static void destructor(void* x){//      
            T* obj=static_cast<T*>(x);
            delete obj;
        }
    private:
        pthread_key_t pkey_;
};
/*
 *    
 */
class ThreadPool:noncopyable{
    public:
        typedef function<void()> Task;//      
        explicit ThreadPool(const string& name=string()):mutex_(),cond_(mutex_),name_(name),running_(false){
        }
        ~ThreadPool(){
            if(running_){
                stop();//               
            }
        }
        void start(int numThreads){
            assert(threads_.empty());
            running_=true;
            threads_.reserve(numThreads);
            for(int i=0;i<numThreads;i++){
                threads_.push_back(new Thread(bind(&ThreadPool::runInThread,this)));//      runInThread    
                threads_[i].start();
            }
        }
        void stop(){
            running_=false;//                 ,                    
            cond_.notifyALL();//           
            for_each(threads_.begin(),threads_.end(),bind(&Thread::join,_1));//        
        }
        void run(const Task& task){
            if(running_){//###4###               
                if(threads_.empty()){//      
                    task();
                }
                else{
                    MutexLockGuard guard(mutex_);//  RAII mutex      
                    queue_.push_back(task);
                    cond_.notify();
                }
            }
            else{
                printf("        
"); } } bool running(){// return running_; } private: void runInThread(){// while(running_){//###2### Task task(take()); if(task){//task NULL task(); } } } Task take(){ MutexLockGuard guard(mutex_); while(queue_.empty()&&running_){//###3### ###2### 。 , A ###2### , running_=false ,A ###3### , 。 ###4### cond_.wait();// } Task task; if(!queue_.empty()){ task=queue_.front(); queue_.pop_front(); } return task; } Mutex mutex_; Condition cond_; string name_; ptr_vector<Thread> threads_;// deque<Task> queue_; bool running_; }; /* * */ class test{ public: test(){ printf("test::constructor
"); } ~test(){ printf("test::deconsturctor
"); } }; void worker1(){ ThreadLocal<test> one; test two=one.value();// printf("worker1
"); } void worke2(){ printf("worker2
"); } void worker3(){ printf("worker3
"); sleep(1); } int main(){ Thread one(worker1); one.start(); one.join(); ThreadPool two; two.start(2); two.run(worke2); two.run(worker3); two.stop(); return 0; }

実行結果:
test::constructor worker 1//単一スレッドの出力test::deconsturctor//以上スレッドプライベートデータスレッド終了時にworker 2//スレッドプールを解放する出力worker 3
ヘッダファイルhpp
#include<iostream>
#include<string>
#include<unistd.h>
#include<pthread.h>
#include<assert.h>
#include<boost/utility.hpp>
using namespace std;
using namespace boost;
/*
*   
*/
class Mutex:noncopyable{
    public:
        Mutex(){
            pthread_mutex_init(&mutex,NULL);
        }
        void lock(){
            pthread_mutex_lock(&mutex);
        }
        void unlock(){
            pthread_mutex_unlock(&mutex);
        }
        pthread_mutex_t& get(){
            return mutex;
        }
    private:
        pthread_mutex_t mutex;
};
/*
*   RAII
*/
class MutexLockGuard:noncopyable{
    public:
        explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){
            mutex_.lock();
        }
        ~MutexLockGuard(){
            mutex_.unlock();
        }
    private:
        Mutex& mutex_;
};
/*
*    
*/
class Condition:noncopyable{
    public:
        explicit Condition(Mutex& mutex):mutex_(mutex){
            pthread_cond_init(&pcond_,NULL);
        }
        ~Condition(){
            pthread_cond_destroy(&pcond_);
        }
        void wait(){
            pthread_cond_wait(&pcond_,&mutex_.get());
        }
        void notify(){
            pthread_cond_signal(&pcond_);
        }
        void notifyALL(){
            pthread_cond_broadcast(&pcond_);
        }
    private:
        Mutex& mutex_;
        pthread_cond_t pcond_;
};
/*
*    
*/
class CountDownLatch{
    public:
        CountDownLatch(int count):mutex_(),condition_(mutex_),count_(count){}
        void wait(){
            MutexLockGuard lock(mutex_);
            while(count_>0)
                condition_.wait();
        }
        void countDown(){
            MutexLockGuard lock(mutex_);
            --count_;
            if(count_==0)
                condition_.notifyALL();
        }
    private:
        mutable Mutex mutex_;
        Condition condition_;
        int count_;
};