C++パッケージングPOSIXスレッドライブラリ(六)スレッドプール
5386 ワード
スレッドプールには2つの主要コンポーネントがあります.
1.threads 2.blocking queue
解決される問題は、スレッドの作成と破棄に伴うオーバーヘッドと、スレッドプールを介した擬似非同期プロセスの実現が、単純な生産者消費者問題と類似していることです(詳細はwikiを参照).
Blocking Queue
ユーザーに対応してタスクCallBackとスレッドを追加してタスクを2つ実行するため、2つの条件変数と1つの反発ロックが必要です.
ユーザーが生産者にタスクを追加する場合、スレッドはタスクを消費者に比べます.2つの条件変数が必要で解決します
1.生産者>消費者(「爆発防止」タスクキュー)2.生産者
Threads
スレッドプールはスレッドのセットからなり、構築フェーズはスレッドのセットを作成して保留し、タスクを追加するたびにnotifyスレッドが実行する...スレッドプールが解析されると、これらのスレッドを回収する...ここには大きな穴が...
コード#コード#
完全なコード:https://github.com/NearXdu/ThreadPool
1.threads 2.blocking queue
解決される問題は、スレッドの作成と破棄に伴うオーバーヘッドと、スレッドプールを介した擬似非同期プロセスの実現が、単純な生産者消費者問題と類似していることです(詳細はwikiを参照).
Blocking Queue
ユーザーに対応してタスクCallBackとスレッドを追加してタスクを2つ実行するため、2つの条件変数と1つの反発ロックが必要です.
ユーザーが生産者にタスクを追加する場合、スレッドはタスクを消費者に比べます.2つの条件変数が必要で解決します
1.生産者>消費者(「爆発防止」タスクキュー)2.生産者
Threads
スレッドプールはスレッドのセットからなり、構築フェーズはスレッドのセットを作成して保留し、タスクを追加するたびにnotifyスレッドが実行する...スレッドプールが解析されると、これらのスレッドを回収する...ここには大きな穴が...
コード#コード#
完全なコード:https://github.com/NearXdu/ThreadPool
#ifndef __THREADPOLL_H__
#define __THREADPOLL_H__
#include "MutexLock.h"
#include "Condition.h"
#include "Thread.h"
#include
#include
#include
#include
#include
class ThreadPool : boost::noncopyable
{
public:
typedef boost::function<void()>Task;
ThreadPool(size_t qsize,size_t nthreads);
~ThreadPool();
void start();
void stop();
void addTask(Task t);
Task getTask();
bool isStarted()const{return isStarted_;}
void runInThread();
private:
mutable MutexLock mutex_;
Condition notEmpty_;//blocking queue full
Condition notFull_;//blocking queue empty
size_t queueSize_;//
std::queue queue_;//
size_t threadsNum_;//
boost::ptr_vector threads_;//
volatile bool isStarted_;
};
#endif
///
#include "ThreadPool.h"
ThreadPool::ThreadPool(size_t q,size_t n)
:mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queueSize_(q),
threadsNum_(n),
isStarted_(false)
{}
ThreadPool::~ThreadPool()
{
if(isStarted_)
{
stop();
}
}
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
isStarted_ = false;
notEmpty_.notifyAll(); //
}
for_each(threads_.begin(),
threads_.end(),
boost::bind(&Thread::join, _1));
}
void ThreadPool::addTask(Task t)
{
if(threads_.empty())
{
t();
}
else
{
MutexLockGuard lock(mutex_);
while(queue_.size()>=queueSize_&&isStarted_)
notFull_.wait();
if(!isStarted_)
return ;
queue_.push(std::move(t));
notEmpty_.notify();
}
}
ThreadPool::Task ThreadPool::getTask()
{
MutexLockGuard lock(mutex_);
while(queue_.empty()&&isStarted_)
{
notEmpty_.wait();
}
Task t;
if(!queue_.empty())
{
assert(!queue_.empty());
t=queue_.front();
queue_.pop();
notFull_.notify();
}
return t;
}
void ThreadPool::runInThread()
{
while(isStarted_)
{
// std::cout<
Task t(getTask());//will blocked until "add" ops
//assert(t!=NULL);
if(t)
{
t();//run
}
}
}
void ThreadPool::start()
{
isStarted_=true;
threads_.reserve(threadsNum_);
for(size_t i=0;inew Thread(boost::bind(
&ThreadPool::runInThread,this)));
threads_[i].start();
}
}