[同時並行][pthread]_[スレッドプールのシンプルな設計と実装]


1.ネットワークファイルをダウンロードする必要がある場合、無制限のnewの新しいスレッドをダウンロードすると、破棄スレッドを作成する際のリソース消費が必然的に発生します.また、一部のネットワーク通信を処理する場合、無制限なスレッドを開くと、システムリソースの浪費が深刻になるに違いない.このとき,スレッドプールはスレッド多重化の問題をうまく解決できる.
2.ここでは5時間かけて簡単なスレッドプールを実現し、子供靴の参考にすることを目的としており、特定のタスクを満たすスレッドプールも非常に難しい技術ではありません.
3.サンプルプログラムは5スレッドのスレッドプールダウンロードファイルを開き、10回のダウンロードタスクを開始した.また、マルチスレッドのデバッグは実は面倒で、道理でそんなに多くの言語Scala、Erlangが自分の同時モデルを実現したわけだ.
4.間違いがあったら指摘してください.
5.boostベースなど、より完全なスレッドプール実装もあります.http://sourceforge.net/projects/threadpool/?source=directory.ただし、サードパーティ製ライブラリを導入することは、依存性と潜在的にデバッグしにくいBugを導入することであるため、サードパーティ製ライブラリコードを使用しない原則に慣れています.説明しない.
6.pthreadスレッドライブラリとcurlネットワークライブラリを使用します.
ファイル1.2(最終編集:20130507)
dh_thread_pool.h,dh_thread_pool.cpp
/*
 * dh_thread_pool.h
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#ifndef DH_THREAD_POOL_H_
#define DH_THREAD_POOL_H_

#include <vector>
#include <queue>
#include "pthread.h"

#include "dh_thread_task.h"

class DhThread;

//1.            .
class DhThreadPool
{
public:
    DhThreadPool(size_t max_thread_number);
    ~DhThreadPool();
    void AddAsynTask(TaskFunc task_func, void* userdata);
    void Activate(); //1.     .      .
    void Destroy(); //1.     ,           ,      .
    void WaitTaskFinishAndDestroy();

    inline void SetWaitTime(unsigned milliseconds)
    {
        wait_time_ = milliseconds;
    }
    inline unsigned GetWaitTime()
    {
        return wait_time_;
    }
    inline size_t GetMaxThreadNumber()
    {
        return max_thread_number_;
    }

    void Execute(size_t *queue_remain_number,size_t *free_thread_number);
    void AddFreeThreadToQueue(DhThread* thread);
    bool is_destroy_; //1.      .
    bool is_wait_and_destroy_;

private:
    static void* ScanTask(void* userdata);

    void LockTaskQueue();
    void UnlockTaskQueue();

    void LockFreeThreadQueue();
    void UnlockFreeThreadQueue();

    size_t max_thread_number_; //1.          .
    unsigned wait_time_;

    std::vector<DhThread*> threads_; //1.     
    std::queue<DhThread*> free_thread_que_; //1.      
    pthread_mutex_t free_thread_que_mutex_; //1.        

    std::queue<DhThreadTask*> task_que_; //1.    
    pthread_mutex_t task_que_mutex_; //1.        
    pthread_t task_que_thread_; //1.      ,               。
};

#endif /* DH_THREAD_POOL_H_ */


/*
 * dh_thread_pool.cpp
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#include "dh_thread_pool.h"

#include <iostream>
#include <stdio.h>
#include "dh_thread.h"
using namespace std;

DhThreadPool::DhThreadPool(size_t max_thread_number)
{
    max_thread_number_ = max_thread_number;
    is_destroy_ = false;
    is_wait_and_destroy_ = false;
    task_que_mutex_ = NULL;
    free_thread_que_mutex_ = NULL;
    wait_time_ = 500;

    pthread_mutex_init(&task_que_mutex_, NULL);
    pthread_mutex_init(&free_thread_que_mutex_, NULL);

}

DhThreadPool::~DhThreadPool()
{
    cout << "~DhThreadPool: " << endl;
    pthread_mutex_destroy(&task_que_mutex_);
    pthread_mutex_destroy(&free_thread_que_mutex_);
}

void DhThreadPool::LockFreeThreadQueue()
{
//    cout << "LockFreeThreadQueue begin: " << endl;
    pthread_mutex_lock(&free_thread_que_mutex_);
//    cout << "LockFreeThreadQueue end: " << endl;
}

void DhThreadPool::UnlockFreeThreadQueue()
{
//    cout << "UnlockFreeThreadQueue begin: " << endl;
    pthread_mutex_unlock(&free_thread_que_mutex_);
//    cout << "UnlockFreeThreadQueue end: " << endl;
}

void DhThreadPool::LockTaskQueue()
{
    pthread_mutex_lock(&task_que_mutex_);
}

void DhThreadPool::UnlockTaskQueue()
{
    pthread_mutex_unlock(&task_que_mutex_);
}

void DhThreadPool::AddFreeThreadToQueue(DhThread* thread)
{
    //1.       .
    LockFreeThreadQueue();
    free_thread_que_.push(thread);
    UnlockFreeThreadQueue();
}

void DhThreadPool::Execute(size_t *queue_remain_number,
        size_t *free_thread_number)
{
//    cout << "DhThreadPool::Execute  begin" << endl;
    LockFreeThreadQueue();
    //1.       ,             .
    //2.    .    .
    if (!free_thread_que_.empty())
    {
        LockTaskQueue();
        if (!task_que_.empty())
        {
            DhThreadTask* task = task_que_.front();
            task_que_.pop();

            //1.    .
            DhThread* free_thread = free_thread_que_.front();
            free_thread_que_.pop();
            free_thread->task_ = task;
            //2.        .
            free_thread->Notify();

            *queue_remain_number = task_que_.size();
        }
        else
        {
            *queue_remain_number = 0;
        }
        UnlockTaskQueue();

        *free_thread_number = free_thread_que_.size();
    }
    else
    {
        *free_thread_number = 0;
    }
    UnlockFreeThreadQueue();
}

void* DhThreadPool::ScanTask(void* userdata)
{
    DhThreadPool* pool = (DhThreadPool*) userdata;
    size_t queue_number = 0;
    size_t free_thread_number = 0;
    while (true)
    {
        if (pool->is_destroy_)
        {
            break;
        }
        pool->Execute(&queue_number, &free_thread_number);
        if (pool->is_wait_and_destroy_ && !queue_number
                && free_thread_number == pool->GetMaxThreadNumber())
        {
            pool->is_destroy_ = true;
            continue;
        }
        Sleep(pool->GetWaitTime());
    }

    cout << "DhThreadPool::ScanTask end" << endl;
    return NULL;
}

void DhThreadPool::Activate()
{
    //1.         .
    for (int i = 0; i < max_thread_number_; ++i)
    {
        DhThread* thread = new DhThread(this);
        threads_.push_back(thread);
        thread->Start();
    }

    //1.      .
    pthread_create(&task_que_thread_, NULL, &ScanTask, this);
}

void DhThreadPool::WaitTaskFinishAndDestroy()
{
    is_wait_and_destroy_ = true;
    pthread_join(task_que_thread_, NULL);

    //1.       .
    size_t size = threads_.size();
    for (size_t i = 0; i < size; ++i)
    {
        DhThread* thread = threads_[i];
        thread->Destroy();
        cout << "thread->Destroy()" << endl;
        delete thread;
    }
}

void DhThreadPool::Destroy()
{
    //1.        .
    cout << "DhThreadPool::Destroy begin" << endl;

    //1.       
    is_destroy_ = true;
    pthread_join(task_que_thread_, NULL);

    //1.       .
    size_t size = threads_.size();
    for (size_t i = 0; i < size; ++i)
    {
        DhThread* thread = threads_[i];
        thread->Destroy();
        cout << "thread->Destroy()" << endl;
        delete thread;
    }

    size_t remain = task_que_.size();
    for (size_t i = 0; i < remain; ++i)
    {
        DhThreadTask* task = task_que_.front();
        task_que_.pop();
        delete task;
    }

    cout << "DhThreadPool::Destroy end " << endl;
}

void DhThreadPool::AddAsynTask(TaskFunc task_func, void* userdata)
{
    DhThreadTask *task = new DhThreadTask(task_func, userdata);

    LockTaskQueue();
    //1.       .
    task_que_.push(task);

    UnlockTaskQueue();
}




ファイル3,4:
dh_thread.h,dh_thread.cpp
/*
 * dh_thread.h
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#ifndef DH_THREAD_H_
#define DH_THREAD_H_

#include "pthread.h"
#include "dh_thread_task.h"

class DhThreadPool;

//1.           .        .
class DhThread
{
public:
	DhThread(DhThreadPool* pool);
	~DhThread();

	DhThreadTask* task_;
	void AddToFreeThreadQueue();
	void Notify();

	void Lock();
	void Unlock();
	void Wait();
	void Join();
	int GetId();
	void Destroy();

	void Start();

private:
	pthread_mutex_t mutex_;
	pthread_cond_t cond_;

	bool is_destroy_;
	pthread_t thread_;
	DhThreadPool* pool_; //1.     
	static void* DoTask(void* userdata);

	int thread_id_;
};

#endif /* DH_THREAD_H_ */
/*
 * dh_thread.cpp
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#include "dh_thread.h"

#include <stdio.h>
#include <iostream>

#include "dh_thread_pool.h"

using namespace std;

DhThread::DhThread(DhThreadPool* pool)
{
	task_ = NULL;
	is_destroy_ = false;
	mutex_ = NULL;
	cond_ = NULL;
	pool_ = pool;
	thread_id_ = 0;

	pthread_mutex_init(&mutex_, NULL);
	pthread_cond_init(&cond_, NULL);
}

void DhThread::Start()
{
	pthread_create(&thread_, NULL, &DhThread::DoTask, this);
	thread_id_ = (int)thread_.p;
}

DhThread::~DhThread()
{
	pthread_mutex_destroy(&mutex_);
	pthread_cond_destroy(&cond_);
}

void DhThread::Lock()
{
	pthread_mutex_lock(&mutex_);
}

void DhThread::Unlock()
{
	pthread_mutex_unlock(&mutex_);
}

void DhThread::AddToFreeThreadQueue()
{
	pool_->AddFreeThreadToQueue(this);
}

void DhThread::Wait()
{
	pthread_cond_wait(&cond_, &mutex_);
}

void DhThread::Join()
{
	int res = pthread_join(thread_, NULL);
	cout << "res:  " << res << endl;

}

void DhThread::Destroy()
{
	Lock();
	is_destroy_ = true;
	Notify();
	Unlock();
	Join();
}

int DhThread::GetId()
{
	return thread_id_;
}

void* DhThread::DoTask(void* userdata)
{
	DhThread* thread = (DhThread*) userdata;
	while (true)
	{
		thread->Lock();
		if (thread->is_destroy_)
		{
			thread->Unlock();
			break;
		}
		thread->Unlock();

		DhThreadTask* task = thread->task_;
		if (task)
		{
			(*task->task_func_)(task->userdata_);
			cout << "task finish: " << thread->GetId() << endl;
			delete task;
			thread->task_ = NULL;
		}

		//1.          .
		//2.    .
		//1.       ,       .
		thread->Lock();
		if (thread->is_destroy_)
		{
			thread->Unlock();
			break;
		}
		thread->AddToFreeThreadQueue();
		thread->Wait();
		thread->Unlock();
	}

	cout << "thread finish: " << thread->GetId() << endl;
	return NULL;
}

void DhThread::Notify()
{
	pthread_cond_signal(&cond_);
}


ファイル5
dh_thread_task.h
/*
 * dh_thread_task.h
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#ifndef DH_THREAD_TASK_H_
#define DH_THREAD_TASK_H_

typedef void * (*TaskFunc)(void * arg);

class DhThreadTask
{
public:
	DhThreadTask(TaskFunc task_func, void* userdata)
	{
		userdata_ = userdata;
		task_func_ = task_func;
	}
	~DhThreadTask()
	{
	}
	void* userdata_;
	TaskFunc task_func_;
};

#endif /* DH_THREAD_TASK_H_ */

ファイル6,7
dh_download_manager.h,dh_download_manager.cpp
/*
 * dh_download_manager.h
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#ifndef DH_DOWNLOAD_MANAGER_H_
#define DH_DOWNLOAD_MANAGER_H_

#include <stdlib.h>

typedef size_t (*WriteMemoryCallback)(void *contents, size_t size, size_t nmemb,
		void *userp);

class DhDownloadManager
{
public:
	DhDownloadManager();
	~DhDownloadManager();
	int Process(const char* url, WriteMemoryCallback callback_func,
			void* userdata);
};

#endif /* DH_DOWNLOAD_MANAGER_H_ */
/*
 * dh_download_manager.cpp
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#include "dh_download_manager.h"

#include <stdlib.h>
#include "curl/curl.h"

DhDownloadManager::DhDownloadManager()
{
	curl_global_init (CURL_GLOBAL_ALL);
}

DhDownloadManager::~DhDownloadManager()
{
	curl_global_cleanup();
}

int DhDownloadManager::Process(const char* url,
		WriteMemoryCallback callback_func, void* userdata)
{
	CURL *curl_handle = curl_easy_init();

	curl_easy_setopt(curl_handle, CURLOPT_URL, url);
	curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, callback_func);
	curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, userdata);
	curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "test-agent/1.0");
//	curl_easy_setopt(curl_handle, CURLOPT_PROXY, "192.168.0.1:808");
//	curl_easy_setopt(curl_handle, CURLOPT_HTTPPROXYTUNNEL, 1L);
	int ret = curl_easy_perform(curl_handle);

	curl_easy_cleanup(curl_handle);
	return 0;
}


ファイル8:
main.cpp
/*
 * main.cpp
 *
 *  Created on: 2013-3-13
 *  Author: Sai
 */

#include <stdio.h>
#include <string.h>
#include <iostream>

using namespace std;

#include "dh_thread_pool.h"
#include "dh_thread_task.h"

#include "dh_download_manager.h"

typedef struct DiskData
{
	FILE* file;
}DiskData;

size_t WriteToDisk(void *contents, size_t size, size_t nmemb,
		void *userp)
{
	DiskData* dd = (DiskData*) userp;
	size_t number = nmemb*size;
	size_t writed_num = fwrite(contents,1,number,dd->file);
	return writed_num;
}

void *RunTaskFunc(void * arg)
{
	int* i = (int*) arg;
	cout << "thread index: " << *i << endl;
	DhDownloadManager* manager = new DhDownloadManager();
	static const char* url =
			"http://www.istonsoft.com/downloads/iston-video-converter.exe";

	DiskData dd;
	char path[8];
	memset(path,0,sizeof(path));
	sprintf(path,"%d.exe",*i);
	dd.file = fopen(path,"wb");
	manager->Process(url,&WriteToDisk,&dd);

	fclose(dd.file);
	return NULL;
}

int main(int argc, char *argv[])
{
	setbuf(stdout, (char*) NULL);
	setbuf(stderr, (char*) NULL);
	printf("Hello, world
"); DhThreadPool *pool = new DhThreadPool(5); pool->Activate(); for (int o = 0; o < 10; ++o) { int *i = new int; *i = o; pool->AddAsynTask(&RunTaskFunc, i); } getchar(); pool->Destroy(); delete pool; return 0; }

ヒント:プロジェクトファイルのダウンロード先:http://download.csdn.net/detail/infoworld/5138920