C++パッケージングPOSIXスレッドライブラリ(六)スレッドプール


スレッドプールには2つの主要コンポーネントがあります.
1.threads 2.blocking queue
解決される問題は、スレッドの作成と破棄に伴うオーバーヘッドと、スレッドプールを介した擬似非同期プロセスの実現が、単純な生産者消費者問題と類似していることです(詳細はwikiを参照).
Blocking Queue
ユーザーに対応してタスクCallBackとスレッドを追加してタスクを2つ実行するため、2つの条件変数と1つの反発ロックが必要です.
ユーザーが生産者にタスクを追加する場合、スレッドはタスクを消費者に比べます.2つの条件変数が必要で解決します
1.生産者>消費者(「爆発防止」タスクキュー)2.生産者
Threads
スレッドプールはスレッドのセットからなり、構築フェーズはスレッドのセットを作成して保留し、タスクを追加するたびにnotifyスレッドが実行する...スレッドプールが解析されると、これらのスレッドを回収する...ここには大きな穴が...
コード#コード#
完全なコード:https://github.com/NearXdu/ThreadPool
#ifndef __THREADPOLL_H__
#define __THREADPOLL_H__
#include "MutexLock.h"
#include "Condition.h"
#include "Thread.h"
#include 
#include 
#include 
#include 
#include 
class ThreadPool : boost::noncopyable
{
    public:
    typedef boost::function<void()>Task;
    ThreadPool(size_t qsize,size_t nthreads);
    ~ThreadPool();
    void start();
    void stop();
    void addTask(Task t);
    Task getTask();

    bool isStarted()const{return isStarted_;}
    void runInThread();

    private:
    mutable MutexLock mutex_;
    Condition notEmpty_;//blocking queue  full    
    Condition notFull_;//blocking queue empty     
    size_t queueSize_;//      
    std::queue queue_;//    

    size_t threadsNum_;//    
    boost::ptr_vector threads_;//       
    volatile bool isStarted_;

};
#endif


///
#include "ThreadPool.h"
ThreadPool::ThreadPool(size_t q,size_t n)
    :mutex_(),
     notEmpty_(mutex_),
     notFull_(mutex_),
     queueSize_(q),
     threadsNum_(n),
     isStarted_(false)
{}
ThreadPool::~ThreadPool()
{
    if(isStarted_)
    {
    stop();
    }
}


void ThreadPool::stop()
{   
    {
    MutexLockGuard lock(mutex_);
    isStarted_ = false;
    notEmpty_.notifyAll(); //       

    }
    for_each(threads_.begin(),
        threads_.end(),
        boost::bind(&Thread::join, _1));

}
void ThreadPool::addTask(Task t)
{
    if(threads_.empty())
    {
    t();
    }
    else
    {

    MutexLockGuard lock(mutex_);
    while(queue_.size()>=queueSize_&&isStarted_)
        notFull_.wait();
    if(!isStarted_)
        return ;

    queue_.push(std::move(t));
    notEmpty_.notify();
    }
}

ThreadPool::Task ThreadPool::getTask()
{
    MutexLockGuard lock(mutex_);
    while(queue_.empty()&&isStarted_)
    {
    notEmpty_.wait();
    }
    Task t;
    if(!queue_.empty())
    {
    assert(!queue_.empty());
    t=queue_.front();
    queue_.pop();
    notFull_.notify();
    }

    return t;
}

void ThreadPool::runInThread()
{
    while(isStarted_)
    {
    //  std::cout<
    Task t(getTask());//will blocked until "add" ops
    //assert(t!=NULL);
    if(t)
    {
        t();//run
    }
    }
}

void ThreadPool::start()
{
    isStarted_=true;
    threads_.reserve(threadsNum_);
    for(size_t i=0;inew Thread(boost::bind(
            &ThreadPool::runInThread,this)));

    threads_[i].start();
    }

}