[同時並行][pthread]_[スレッドプールのシンプルな設計と実装]
1.ネットワークファイルをダウンロードする必要がある場合、無制限のnewの新しいスレッドをダウンロードすると、破棄スレッドを作成する際のリソース消費が必然的に発生します.また、一部のネットワーク通信を処理する場合、無制限なスレッドを開くと、システムリソースの浪費が深刻になるに違いない.このとき,スレッドプールはスレッド多重化の問題をうまく解決できる.
2.ここでは5時間かけて簡単なスレッドプールを実現し、子供靴の参考にすることを目的としており、特定のタスクを満たすスレッドプールも非常に難しい技術ではありません.
3.サンプルプログラムは5スレッドのスレッドプールダウンロードファイルを開き、10回のダウンロードタスクを開始した.また、マルチスレッドのデバッグは実は面倒で、道理でそんなに多くの言語Scala、Erlangが自分の同時モデルを実現したわけだ.
4.間違いがあったら指摘してください.
5.boostベースなど、より完全なスレッドプール実装もあります.http://sourceforge.net/projects/threadpool/?source=directory.ただし、サードパーティ製ライブラリを導入することは、依存性と潜在的にデバッグしにくいBugを導入することであるため、サードパーティ製ライブラリコードを使用しない原則に慣れています.説明しない.
6.pthreadスレッドライブラリとcurlネットワークライブラリを使用します.
ファイル1.2(最終編集:20130507)
dh_thread_pool.h,dh_thread_pool.cpp
ファイル3,4:
dh_thread.h,dh_thread.cpp
ファイル5
dh_thread_task.h
ファイル6,7
dh_download_manager.h,dh_download_manager.cpp
ファイル8:
main.cpp
ヒント:プロジェクトファイルのダウンロード先:http://download.csdn.net/detail/infoworld/5138920
2.ここでは5時間かけて簡単なスレッドプールを実現し、子供靴の参考にすることを目的としており、特定のタスクを満たすスレッドプールも非常に難しい技術ではありません.
3.サンプルプログラムは5スレッドのスレッドプールダウンロードファイルを開き、10回のダウンロードタスクを開始した.また、マルチスレッドのデバッグは実は面倒で、道理でそんなに多くの言語Scala、Erlangが自分の同時モデルを実現したわけだ.
4.間違いがあったら指摘してください.
5.boostベースなど、より完全なスレッドプール実装もあります.http://sourceforge.net/projects/threadpool/?source=directory.ただし、サードパーティ製ライブラリを導入することは、依存性と潜在的にデバッグしにくいBugを導入することであるため、サードパーティ製ライブラリコードを使用しない原則に慣れています.説明しない.
6.pthreadスレッドライブラリとcurlネットワークライブラリを使用します.
ファイル1.2(最終編集:20130507)
dh_thread_pool.h,dh_thread_pool.cpp
/*
* dh_thread_pool.h
*
* Created on: 2013-3-13
* Author: Sai
*/
#ifndef DH_THREAD_POOL_H_
#define DH_THREAD_POOL_H_
#include <vector>
#include <queue>
#include "pthread.h"
#include "dh_thread_task.h"
class DhThread;
//1. .
class DhThreadPool
{
public:
DhThreadPool(size_t max_thread_number);
~DhThreadPool();
void AddAsynTask(TaskFunc task_func, void* userdata);
void Activate(); //1. . .
void Destroy(); //1. , , .
void WaitTaskFinishAndDestroy();
inline void SetWaitTime(unsigned milliseconds)
{
wait_time_ = milliseconds;
}
inline unsigned GetWaitTime()
{
return wait_time_;
}
inline size_t GetMaxThreadNumber()
{
return max_thread_number_;
}
void Execute(size_t *queue_remain_number,size_t *free_thread_number);
void AddFreeThreadToQueue(DhThread* thread);
bool is_destroy_; //1. .
bool is_wait_and_destroy_;
private:
static void* ScanTask(void* userdata);
void LockTaskQueue();
void UnlockTaskQueue();
void LockFreeThreadQueue();
void UnlockFreeThreadQueue();
size_t max_thread_number_; //1. .
unsigned wait_time_;
std::vector<DhThread*> threads_; //1.
std::queue<DhThread*> free_thread_que_; //1.
pthread_mutex_t free_thread_que_mutex_; //1.
std::queue<DhThreadTask*> task_que_; //1.
pthread_mutex_t task_que_mutex_; //1.
pthread_t task_que_thread_; //1. , 。
};
#endif /* DH_THREAD_POOL_H_ */
/*
* dh_thread_pool.cpp
*
* Created on: 2013-3-13
* Author: Sai
*/
#include "dh_thread_pool.h"
#include <iostream>
#include <stdio.h>
#include "dh_thread.h"
using namespace std;
DhThreadPool::DhThreadPool(size_t max_thread_number)
{
max_thread_number_ = max_thread_number;
is_destroy_ = false;
is_wait_and_destroy_ = false;
task_que_mutex_ = NULL;
free_thread_que_mutex_ = NULL;
wait_time_ = 500;
pthread_mutex_init(&task_que_mutex_, NULL);
pthread_mutex_init(&free_thread_que_mutex_, NULL);
}
DhThreadPool::~DhThreadPool()
{
cout << "~DhThreadPool: " << endl;
pthread_mutex_destroy(&task_que_mutex_);
pthread_mutex_destroy(&free_thread_que_mutex_);
}
void DhThreadPool::LockFreeThreadQueue()
{
// cout << "LockFreeThreadQueue begin: " << endl;
pthread_mutex_lock(&free_thread_que_mutex_);
// cout << "LockFreeThreadQueue end: " << endl;
}
void DhThreadPool::UnlockFreeThreadQueue()
{
// cout << "UnlockFreeThreadQueue begin: " << endl;
pthread_mutex_unlock(&free_thread_que_mutex_);
// cout << "UnlockFreeThreadQueue end: " << endl;
}
void DhThreadPool::LockTaskQueue()
{
pthread_mutex_lock(&task_que_mutex_);
}
void DhThreadPool::UnlockTaskQueue()
{
pthread_mutex_unlock(&task_que_mutex_);
}
void DhThreadPool::AddFreeThreadToQueue(DhThread* thread)
{
//1. .
LockFreeThreadQueue();
free_thread_que_.push(thread);
UnlockFreeThreadQueue();
}
void DhThreadPool::Execute(size_t *queue_remain_number,
size_t *free_thread_number)
{
// cout << "DhThreadPool::Execute begin" << endl;
LockFreeThreadQueue();
//1. , .
//2. . .
if (!free_thread_que_.empty())
{
LockTaskQueue();
if (!task_que_.empty())
{
DhThreadTask* task = task_que_.front();
task_que_.pop();
//1. .
DhThread* free_thread = free_thread_que_.front();
free_thread_que_.pop();
free_thread->task_ = task;
//2. .
free_thread->Notify();
*queue_remain_number = task_que_.size();
}
else
{
*queue_remain_number = 0;
}
UnlockTaskQueue();
*free_thread_number = free_thread_que_.size();
}
else
{
*free_thread_number = 0;
}
UnlockFreeThreadQueue();
}
void* DhThreadPool::ScanTask(void* userdata)
{
DhThreadPool* pool = (DhThreadPool*) userdata;
size_t queue_number = 0;
size_t free_thread_number = 0;
while (true)
{
if (pool->is_destroy_)
{
break;
}
pool->Execute(&queue_number, &free_thread_number);
if (pool->is_wait_and_destroy_ && !queue_number
&& free_thread_number == pool->GetMaxThreadNumber())
{
pool->is_destroy_ = true;
continue;
}
Sleep(pool->GetWaitTime());
}
cout << "DhThreadPool::ScanTask end" << endl;
return NULL;
}
void DhThreadPool::Activate()
{
//1. .
for (int i = 0; i < max_thread_number_; ++i)
{
DhThread* thread = new DhThread(this);
threads_.push_back(thread);
thread->Start();
}
//1. .
pthread_create(&task_que_thread_, NULL, &ScanTask, this);
}
void DhThreadPool::WaitTaskFinishAndDestroy()
{
is_wait_and_destroy_ = true;
pthread_join(task_que_thread_, NULL);
//1. .
size_t size = threads_.size();
for (size_t i = 0; i < size; ++i)
{
DhThread* thread = threads_[i];
thread->Destroy();
cout << "thread->Destroy()" << endl;
delete thread;
}
}
void DhThreadPool::Destroy()
{
//1. .
cout << "DhThreadPool::Destroy begin" << endl;
//1.
is_destroy_ = true;
pthread_join(task_que_thread_, NULL);
//1. .
size_t size = threads_.size();
for (size_t i = 0; i < size; ++i)
{
DhThread* thread = threads_[i];
thread->Destroy();
cout << "thread->Destroy()" << endl;
delete thread;
}
size_t remain = task_que_.size();
for (size_t i = 0; i < remain; ++i)
{
DhThreadTask* task = task_que_.front();
task_que_.pop();
delete task;
}
cout << "DhThreadPool::Destroy end " << endl;
}
void DhThreadPool::AddAsynTask(TaskFunc task_func, void* userdata)
{
DhThreadTask *task = new DhThreadTask(task_func, userdata);
LockTaskQueue();
//1. .
task_que_.push(task);
UnlockTaskQueue();
}
ファイル3,4:
dh_thread.h,dh_thread.cpp
/*
* dh_thread.h
*
* Created on: 2013-3-13
* Author: Sai
*/
#ifndef DH_THREAD_H_
#define DH_THREAD_H_
#include "pthread.h"
#include "dh_thread_task.h"
class DhThreadPool;
//1. . .
class DhThread
{
public:
DhThread(DhThreadPool* pool);
~DhThread();
DhThreadTask* task_;
void AddToFreeThreadQueue();
void Notify();
void Lock();
void Unlock();
void Wait();
void Join();
int GetId();
void Destroy();
void Start();
private:
pthread_mutex_t mutex_;
pthread_cond_t cond_;
bool is_destroy_;
pthread_t thread_;
DhThreadPool* pool_; //1.
static void* DoTask(void* userdata);
int thread_id_;
};
#endif /* DH_THREAD_H_ */
/*
* dh_thread.cpp
*
* Created on: 2013-3-13
* Author: Sai
*/
#include "dh_thread.h"
#include <stdio.h>
#include <iostream>
#include "dh_thread_pool.h"
using namespace std;
DhThread::DhThread(DhThreadPool* pool)
{
task_ = NULL;
is_destroy_ = false;
mutex_ = NULL;
cond_ = NULL;
pool_ = pool;
thread_id_ = 0;
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&cond_, NULL);
}
void DhThread::Start()
{
pthread_create(&thread_, NULL, &DhThread::DoTask, this);
thread_id_ = (int)thread_.p;
}
DhThread::~DhThread()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
void DhThread::Lock()
{
pthread_mutex_lock(&mutex_);
}
void DhThread::Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void DhThread::AddToFreeThreadQueue()
{
pool_->AddFreeThreadToQueue(this);
}
void DhThread::Wait()
{
pthread_cond_wait(&cond_, &mutex_);
}
void DhThread::Join()
{
int res = pthread_join(thread_, NULL);
cout << "res: " << res << endl;
}
void DhThread::Destroy()
{
Lock();
is_destroy_ = true;
Notify();
Unlock();
Join();
}
int DhThread::GetId()
{
return thread_id_;
}
void* DhThread::DoTask(void* userdata)
{
DhThread* thread = (DhThread*) userdata;
while (true)
{
thread->Lock();
if (thread->is_destroy_)
{
thread->Unlock();
break;
}
thread->Unlock();
DhThreadTask* task = thread->task_;
if (task)
{
(*task->task_func_)(task->userdata_);
cout << "task finish: " << thread->GetId() << endl;
delete task;
thread->task_ = NULL;
}
//1. .
//2. .
//1. , .
thread->Lock();
if (thread->is_destroy_)
{
thread->Unlock();
break;
}
thread->AddToFreeThreadQueue();
thread->Wait();
thread->Unlock();
}
cout << "thread finish: " << thread->GetId() << endl;
return NULL;
}
void DhThread::Notify()
{
pthread_cond_signal(&cond_);
}
ファイル5
dh_thread_task.h
/*
* dh_thread_task.h
*
* Created on: 2013-3-13
* Author: Sai
*/
#ifndef DH_THREAD_TASK_H_
#define DH_THREAD_TASK_H_
typedef void * (*TaskFunc)(void * arg);
class DhThreadTask
{
public:
DhThreadTask(TaskFunc task_func, void* userdata)
{
userdata_ = userdata;
task_func_ = task_func;
}
~DhThreadTask()
{
}
void* userdata_;
TaskFunc task_func_;
};
#endif /* DH_THREAD_TASK_H_ */
ファイル6,7
dh_download_manager.h,dh_download_manager.cpp
/*
* dh_download_manager.h
*
* Created on: 2013-3-13
* Author: Sai
*/
#ifndef DH_DOWNLOAD_MANAGER_H_
#define DH_DOWNLOAD_MANAGER_H_
#include <stdlib.h>
typedef size_t (*WriteMemoryCallback)(void *contents, size_t size, size_t nmemb,
void *userp);
class DhDownloadManager
{
public:
DhDownloadManager();
~DhDownloadManager();
int Process(const char* url, WriteMemoryCallback callback_func,
void* userdata);
};
#endif /* DH_DOWNLOAD_MANAGER_H_ */
/*
* dh_download_manager.cpp
*
* Created on: 2013-3-13
* Author: Sai
*/
#include "dh_download_manager.h"
#include <stdlib.h>
#include "curl/curl.h"
DhDownloadManager::DhDownloadManager()
{
curl_global_init (CURL_GLOBAL_ALL);
}
DhDownloadManager::~DhDownloadManager()
{
curl_global_cleanup();
}
int DhDownloadManager::Process(const char* url,
WriteMemoryCallback callback_func, void* userdata)
{
CURL *curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_URL, url);
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, callback_func);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, userdata);
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "test-agent/1.0");
// curl_easy_setopt(curl_handle, CURLOPT_PROXY, "192.168.0.1:808");
// curl_easy_setopt(curl_handle, CURLOPT_HTTPPROXYTUNNEL, 1L);
int ret = curl_easy_perform(curl_handle);
curl_easy_cleanup(curl_handle);
return 0;
}
ファイル8:
main.cpp
/*
* main.cpp
*
* Created on: 2013-3-13
* Author: Sai
*/
#include <stdio.h>
#include <string.h>
#include <iostream>
using namespace std;
#include "dh_thread_pool.h"
#include "dh_thread_task.h"
#include "dh_download_manager.h"
typedef struct DiskData
{
FILE* file;
}DiskData;
size_t WriteToDisk(void *contents, size_t size, size_t nmemb,
void *userp)
{
DiskData* dd = (DiskData*) userp;
size_t number = nmemb*size;
size_t writed_num = fwrite(contents,1,number,dd->file);
return writed_num;
}
void *RunTaskFunc(void * arg)
{
int* i = (int*) arg;
cout << "thread index: " << *i << endl;
DhDownloadManager* manager = new DhDownloadManager();
static const char* url =
"http://www.istonsoft.com/downloads/iston-video-converter.exe";
DiskData dd;
char path[8];
memset(path,0,sizeof(path));
sprintf(path,"%d.exe",*i);
dd.file = fopen(path,"wb");
manager->Process(url,&WriteToDisk,&dd);
fclose(dd.file);
return NULL;
}
int main(int argc, char *argv[])
{
setbuf(stdout, (char*) NULL);
setbuf(stderr, (char*) NULL);
printf("Hello, world
");
DhThreadPool *pool = new DhThreadPool(5);
pool->Activate();
for (int o = 0; o < 10; ++o)
{
int *i = new int;
*i = o;
pool->AddAsynTask(&RunTaskFunc, i);
}
getchar();
pool->Destroy();
delete pool;
return 0;
}
ヒント:プロジェクトファイルのダウンロード先:http://download.csdn.net/detail/infoworld/5138920