C++スレッドプール実戦
7122 ワード
スレッドプールの定義
ここの池はイメージの言い方です.スレッドプールには、作成されたスレッドが山積みされており、最初は空き待機状態です.新しいタスクを処理する必要がある場合は、このスレッド(スレッドプールにたとえる)から待機中のスレッドを取り出してタスクを処理し、タスクが処理された後、スレッドをプールに戻し(一般的にはスレッドの状態をアイドルに設定する)、後続のタスクが使用されるようにします.プール内のスレッドがすべてビジー状態で、スレッドプールに使用可能な空き待機スレッドがない場合、必要に応じて新しいスレッドを作成してプールに配置するか、タスクの現在のスレッドプール内のすべてのスレッドがビジーであることを通知し、しばらく待ってから試してください.
二スレッドプールを使用する理由
スレッドの作成または破棄は、プロセスの作成および破棄に比べて軽量です(オーバーヘッドはプロセスほど大きくありません).しかし、私たちのタスクで大量のスレッドの作成および破棄操作が必要な場合、これらのオーバーヘッドを組み合わせると大きくなります.スレッドプールの利点は、スレッド多重化であり、あるスレッドは、1つのタスクを処理した後、破棄せずに次のタスクを処理し続けることができ、無駄なオーバーヘッドを回避できるため、大量の同時タスクを連続的に発生させる場合に特に適しています.
三実戦
1 thread_pool.h
2 thread_pool.cpp
3 test.cpp
四運転結果
ここの池はイメージの言い方です.スレッドプールには、作成されたスレッドが山積みされており、最初は空き待機状態です.新しいタスクを処理する必要がある場合は、このスレッド(スレッドプールにたとえる)から待機中のスレッドを取り出してタスクを処理し、タスクが処理された後、スレッドをプールに戻し(一般的にはスレッドの状態をアイドルに設定する)、後続のタスクが使用されるようにします.プール内のスレッドがすべてビジー状態で、スレッドプールに使用可能な空き待機スレッドがない場合、必要に応じて新しいスレッドを作成してプールに配置するか、タスクの現在のスレッドプール内のすべてのスレッドがビジーであることを通知し、しばらく待ってから試してください.
二スレッドプールを使用する理由
スレッドの作成または破棄は、プロセスの作成および破棄に比べて軽量です(オーバーヘッドはプロセスほど大きくありません).しかし、私たちのタスクで大量のスレッドの作成および破棄操作が必要な場合、これらのオーバーヘッドを組み合わせると大きくなります.スレッドプールの利点は、スレッド多重化であり、あるスレッドは、1つのタスクを処理した後、破棄せずに次のタスクを処理し続けることができ、無駄なオーバーヘッドを回避できるため、大量の同時タスクを連続的に発生させる場合に特に適しています.
三実戦
1 thread_pool.h
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
#include
#include
#include
using namespace std;
/* : */
class CTask {
protected:
string m_strTaskName; //
void* m_ptrData; //
public:
CTask() = default;
CTask(string &taskName)
: m_strTaskName(taskName)
, m_ptrData(NULL) {}
virtual int Run() = 0;
void setData(void* data); //
virtual ~CTask() {}
};
/* */
class CThreadPool {
private:
static vector m_vecTaskList; //
static bool shutdown; //
int m_iThreadNum; //
pthread_t *pthread_id; //
static pthread_mutex_t m_pthreadMutex; //
static pthread_cond_t m_pthreadCond; //
protected:
static void* ThreadFunc(void *threadData); //
static int MoveToIdle(pthread_t tid); // ,
static int MoveToBusy(pthread_t tid); //
int Create(); //
public:
CThreadPool(int threadNum);
int AddTask(CTask *task); //
int StopAll(); //
int getTaskSize(); //
};
#endif
2 thread_pool.cpp
#include "thread_pool.h"
#include
void CTask::setData(void* data) {
m_ptrData = data;
}
//
vector CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
//
CThreadPool::CThreadPool(int threadNum) {
this->m_iThreadNum = threadNum;
printf("I will create %d threads.
", threadNum);
Create();
}
//
void* CThreadPool::ThreadFunc(void* threadData) {
pthread_t tid = pthread_self();
while (1)
{
pthread_mutex_lock(&m_pthreadMutex);
// ,
while (m_vecTaskList.size() == 0 && !shutdown)
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
//
if (shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("[tid: %lu]\texit
", pthread_self());
pthread_exit(NULL);
}
printf("[tid: %lu]\trun: ", tid);
vector::iterator iter = m_vecTaskList.begin();
//
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); //
printf("[tid: %lu]\tidle
", tid);
}
return (void*)0;
}
//
int CThreadPool::AddTask(CTask *task) {
pthread_mutex_lock(&m_pthreadMutex);
m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
//
int CThreadPool::Create() {
pthread_id = new pthread_t[m_iThreadNum];
for (int i = 0; i < m_iThreadNum; i++)
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
return 0;
}
//
int CThreadPool::StopAll() {
//
if (shutdown)
return -1;
printf("Now I will end all threads!
");
// ,
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
//
for (int i = 0; i < m_iThreadNum; i++)
pthread_join(pthread_id[i], NULL);
delete[] pthread_id;
pthread_id = NULL;
//
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
//
int CThreadPool::getTaskSize() {
return m_vecTaskList.size();
}
3 test.cpp
#include "thread_pool.h"
#include
#include
#include
class CMyTask:public CTask{
public:
CMyTask()=default;
int Run(){
printf("%s
",(char*)m_ptrData);
int x =rand()%4+1;
sleep(x);
return 0;
}
~CMyTask(){}
};
int main(int argc, const char* argv[])
{
CMyTask taskobj;
char szTmp[]="hello!";
taskobj.setData((void*)szTmp);
CThreadPool threadpool(6); // 6
for(int i=0;i<10;i++)
threadpool.AddTask(&taskobj);
while(1){
printf("There are still %d tasks need to handle
",threadpool.getTaskSize());
//
if(threadpool.getTaskSize()==0)
{
//
if(threadpool.StopAll()==-1)
{
printf("Thread pool clear,exit
");
exit(0);
}
}
sleep(2);
printf("2 seconds later...
");
}
return 0;
}
四運転結果
[root@localhost test]# g++ -o test test.cpp thread_pool.cpp -lpthread -std=c++11
[root@localhost test]# ./test
I will create 6 threads.
There are still 10 tasks need to handle
[tid: 139862531479296] run: hello!
[tid: 139862523086592] run: hello!
[tid: 139862565050112] run: hello!
[tid: 139862556657408] run: hello!
[tid: 139862548264704] run: hello!
[tid: 139862539872000] run: hello!
2 seconds later...
There are still 4 tasks need to handle
[tid: 139862565050112] idle
[tid: 139862565050112] run: hello!
[tid: 139862548264704] idle
[tid: 139862548264704] run: hello!
[tid: 139862523086592] idle
[tid: 139862523086592] run: hello!
[tid: 139862548264704] idle
[tid: 139862548264704] run: hello!
[tid: 139862539872000] idle
2 seconds later...
There are still 0 tasks need to handle
Now I will end all threads!
[tid: 139862531479296] idle
[tid: 139862531479296] exit
[tid: 139862556657408] idle
[tid: 139862556657408] exit
[tid: 139862539872000] exit
[tid: 139862565050112] idle
[tid: 139862565050112] exit
[tid: 139862523086592] idle
[tid: 139862523086592] exit
[tid: 139862548264704] idle
[tid: 139862548264704] exit
2 seconds later...
There are still 0 tasks need to handle
Thread pool clear,exit