スレッドプールベース
28309 ワード
スレッドプールがある理由
スレッドプールで発生する問題はありません.
ほとんどのネットワークサーバは、Webサーバを含めて、単位時間当たりに膨大な数の接続要求を処理しなければならないが、処理時間が短いという特徴がある.従来のマルチスレッドサーバモデルでは、リクエストが到着すると、新しいスレッドが作成され、そのスレッドによってタスクが実行され、タスクが実行されると、スレッドが終了します.これが「即時作成、即時破棄」のポリシーです.スレッドの作成時間は、作成プロセスに比べて大幅に短縮されていますが、スレッドにコミットされたタスクが実行時間が短く、実行回数が非常に頻繁である場合、サーバはスレッドの作成と破棄を停止しない状態になります.このオーバーヘッドは無視できません.特に、スレッドの実行時間が非常に短い場合です.
スレッドプールの考え方
スレッドプールは、上記の問題を解決するために、アプリケーションが起動した後、すぐに一定数のスレッドを作成し、空きキューに入れる原理です.これらのスレッドはすべてブロックされた状態で、これらのスレッドはメモリが少ししかなく、CPUを占有しません.タスクが到来すると、スレッドプールは空きスレッドを選択し、このスレッドにタスクを転送して実行します.すべてのスレッドがタスクを処理している場合、スレッドプールは、より多くのタスクを処理するために自動的に一定の数の新しいスレッドを作成します.タスクが完了すると、スレッドは終了するのではなく、オンライン・プログラム・プールで次のタスクを待機します.ほとんどのスレッドがブロックされている場合、スレッドプールはスレッドの一部を自動的に破棄し、システムリソースを回収します.
スレッドプールの設計構想
1、現実の例
銀行サービスホールを例に挙げます.銀行のサービスホールをスレッドプールと見なすことができます.ホールの椅子に窓が空いているのを待っている預金者たちは(タスクキュー)、カウンターウィンドウのカウンターマンはタスクを実行するスレッド(実行キュー)と見なされます.銀行サービスホールのもう一つの非常に重要な構成はランプを提示することであり、このランプはカウンターと預金者の協力を調整し、各預金者が同時に1人のカウンターにサービスされることを保証し、1人のカウンターが空いていて、預金者が待っている間にこのカウンターが必ずこの預金者にサービスを提供することを保証する.銀行サービス庁のランプは、スレッドプールでスレッドとタスクを調整するプール管理コンポーネント(反発ロック、条件変数、信号量)などに相当します.
2、スレッドプールを構成するいくつかの部分1、以上の例について、1つのスレッドプールを実現するには、プログラムの開始時にスレッドを作成しなければならないことを知っています.この作成されたスレッドは、タスクが実行できないときにブロックされ、1つのタスクが実行されるときに1つのスレッドがタスクを取得することができ、他のスレッドがブロックされ続けます.タスクを取得したスレッドは、タスク関数を実行した後、スレッドプールが破棄されるまで次のタスクを待つ間に作成したすべてのスレッドが破棄されるまでブロックされます.2、1つのタスクを実行するたびに、そのタスクがタスクキューに追加され、ブロックされてタスクを待つスレッドにタスクが実行されることを通知すると、タスクの実行条件が必要なスレッドがタスクキューから1つのタスクを持って実行する.ここでは実際には生産者消費者モデルであり,生産者はタスクをタスクキューに入れるスレッドであり,消費者はタスクを実行するスレッドであり,タスクキューは取引場所である.この生産者消費者モデルを維持するには,反発ロック,条件変数,信号量などを用いなければならない.これらがスレッドプールの管理コンポーネントです.
C言語におけるスレッドプールのデータ構造設計:1.タスクキュー(実行するタスク関数の関数ポインタ、関数のパラメータ)2.実行キュー(スレッドid、スレッド終了識別、プール管理コンポーネント)3.プール管理コンポーネント(反発ロック、条件変数、タスクキュー、実行キュー)
スレッドプールのコード実装:Cバージョン
C++言語におけるスレッドプールデータ構造の設計:スレッドプールはクラスにカプセル化され、クラスのメンバーにはvectorまたはlistタイプのタスクキューがあり、作成されたのでスレッドのid、すべてのスレッドの終了フラグがある.プール管理コントロール(信号量、反発量、条件変数など).スレッドエントリ関数の関数ポインタなど.
スレッドプールの実装C++:スレッドプールコントロールファイル:パッケージング反発ロックと信号量locker.h
locker.cpp
スレッドプール:
スレッドプールで発生する問題はありません.
ほとんどのネットワークサーバは、Webサーバを含めて、単位時間当たりに膨大な数の接続要求を処理しなければならないが、処理時間が短いという特徴がある.従来のマルチスレッドサーバモデルでは、リクエストが到着すると、新しいスレッドが作成され、そのスレッドによってタスクが実行され、タスクが実行されると、スレッドが終了します.これが「即時作成、即時破棄」のポリシーです.スレッドの作成時間は、作成プロセスに比べて大幅に短縮されていますが、スレッドにコミットされたタスクが実行時間が短く、実行回数が非常に頻繁である場合、サーバはスレッドの作成と破棄を停止しない状態になります.このオーバーヘッドは無視できません.特に、スレッドの実行時間が非常に短い場合です.
スレッドプールの考え方
スレッドプールは、上記の問題を解決するために、アプリケーションが起動した後、すぐに一定数のスレッドを作成し、空きキューに入れる原理です.これらのスレッドはすべてブロックされた状態で、これらのスレッドはメモリが少ししかなく、CPUを占有しません.タスクが到来すると、スレッドプールは空きスレッドを選択し、このスレッドにタスクを転送して実行します.すべてのスレッドがタスクを処理している場合、スレッドプールは、より多くのタスクを処理するために自動的に一定の数の新しいスレッドを作成します.タスクが完了すると、スレッドは終了するのではなく、オンライン・プログラム・プールで次のタスクを待機します.ほとんどのスレッドがブロックされている場合、スレッドプールはスレッドの一部を自動的に破棄し、システムリソースを回収します.
スレッドプールの設計構想
1、現実の例
銀行サービスホールを例に挙げます.銀行のサービスホールをスレッドプールと見なすことができます.ホールの椅子に窓が空いているのを待っている預金者たちは(タスクキュー)、カウンターウィンドウのカウンターマンはタスクを実行するスレッド(実行キュー)と見なされます.銀行サービスホールのもう一つの非常に重要な構成はランプを提示することであり、このランプはカウンターと預金者の協力を調整し、各預金者が同時に1人のカウンターにサービスされることを保証し、1人のカウンターが空いていて、預金者が待っている間にこのカウンターが必ずこの預金者にサービスを提供することを保証する.銀行サービス庁のランプは、スレッドプールでスレッドとタスクを調整するプール管理コンポーネント(反発ロック、条件変数、信号量)などに相当します.
2、スレッドプールを構成するいくつかの部分1、以上の例について、1つのスレッドプールを実現するには、プログラムの開始時にスレッドを作成しなければならないことを知っています.この作成されたスレッドは、タスクが実行できないときにブロックされ、1つのタスクが実行されるときに1つのスレッドがタスクを取得することができ、他のスレッドがブロックされ続けます.タスクを取得したスレッドは、タスク関数を実行した後、スレッドプールが破棄されるまで次のタスクを待つ間に作成したすべてのスレッドが破棄されるまでブロックされます.2、1つのタスクを実行するたびに、そのタスクがタスクキューに追加され、ブロックされてタスクを待つスレッドにタスクが実行されることを通知すると、タスクの実行条件が必要なスレッドがタスクキューから1つのタスクを持って実行する.ここでは実際には生産者消費者モデルであり,生産者はタスクをタスクキューに入れるスレッドであり,消費者はタスクを実行するスレッドであり,タスクキューは取引場所である.この生産者消費者モデルを維持するには,反発ロック,条件変数,信号量などを用いなければならない.これらがスレッドプールの管理コンポーネントです.
C言語におけるスレッドプールのデータ構造設計:1.タスクキュー(実行するタスク関数の関数ポインタ、関数のパラメータ)2.実行キュー(スレッドid、スレッド終了識別、プール管理コンポーネント)3.プール管理コンポーネント(反発ロック、条件変数、タスクキュー、実行キュー)
スレッドプールのコード実装:Cバージョン
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
// ( )
#define LL_ADD(item, list) do { \
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
//
#define LL_REMOVE(item, list) do { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
typedef struct NWORKER {// , id, terminate
// , 。
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
typedef struct NJOB {// , ,
// 。
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
typedef struct NWORKQUEUE {// ,
struct NWORKER *workers;// 。
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr) {//
nWorker *worker = (nWorker*)ptr;/
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);//
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;//
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}// 。
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);//
break;// while
}
nJob *job = worker->workqueue->waiting_jobs;w
if (job != NULL) {//
LL_REMOVE(job, worker->workqueue->waiting_jobs);//
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);// 。 while
}
free(worker);
pthread_exit(NULL);
}
// // //
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, ezeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;//
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;//
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {//
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));//
if (worker == NULL) {
perror("malloc");
return 1;i
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;//
// thread id,
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);// ,
return 1;
}
LL_ADD(worker, worker->workqueue->workers);//
}
return 0;
}
//
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
e for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
wiorker->terminate = 1;// 。
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);// ,
w
pthread_cond_signal(&workqueue->jobs_cond);x//
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000
void king_counter(nJob *job) {//
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu
", index, pthread_self());// , id
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREAD);//
int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;//
ntyThreadPoolQueue(&pool, job);//
}
getchar();
printf("
");
}
C++言語におけるスレッドプールデータ構造の設計:スレッドプールはクラスにカプセル化され、クラスのメンバーにはvectorまたはlistタイプのタスクキューがあり、作成されたのでスレッドのid、すべてのスレッドの終了フラグがある.プール管理コントロール(信号量、反発量、条件変数など).スレッドエントリ関数の関数ポインタなど.
スレッドプールの実装C++:スレッドプールコントロールファイル:パッケージング反発ロックと信号量locker.h
#ifndef __LOCKER_H__
#define __LOCKER_H__
#include
#include
#include
class sem//
{
public:
sem();//
~sem();//
bool wait();// P
bool post();// V
private:
sem_t m_sem;
};
class locker
{
public:
locker();//
~locker();//
bool lock();//
bool unlock();//
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond();//
~cond();//
bool wait();//
bool signal();//
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
locker.cpp
#include"locker.h"
sem::sem()
{
if(sem_init(&m_sem,0,0)!=0)
{
throw std::exception();
}
}
sem::~sem()
{
sem_destroy(&m_sem);
}
bool sem::wait()
{
return sem_wait(&m_sem)==0;
}
bool sem::post()
{
return sem_post(&m_sem)==0;
}
locker::locker()
{
if(pthread_mutex_init(&m_mutex,NULL)!=0)
{
throw std::exception();
}
}
locker::~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool locker::lock()
{
return pthread_mutex_lock(&m_mutex);
}
bool locker::unlock()
{
return pthread_mutex_unlock(&m_mutex);
}
cond::cond()
{
if(pthread_mutex_init(&m_mutex,NULL)!=0)
{
throw std::exception();
}
if(pthread_cond_init(&m_cond,NULL)!=0)
{
throw std::exception();
}
}
cond::~cond()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
bool cond::wait()
{
int ret=0;
pthread_mutex_lock(&m_mutex);
ret=pthread_cond_wait(&m_cond,&m_mutex); //
pthread_mutex_unlock(&m_mutex);
return ret==0;
}
bool cond::signal()
{
return pthread_cond_signal(&m_cond)==0;
}
スレッドプール:
#include
#include
#include
#include
#include
#include"locker.h"
// , , T
template<typename T>
class threadpool
{
public:
threadpool(int thread_number=8,int max_requests=1000);
~threadpool();
bool append(T* request); //
private:
static void* worker(void *arg); //
void run(); //
private:
int m_thread_number; //
int m_max_requests; //
pthread_t *m_threads; // tid
std::list m_workqueue; //
locker m_queuelocker; //
sem m_queuestat; //
bool m_stop; //
};
template<typename T>
threadpool::threadpool(int thread_number,int max_requests)//
:m_thread_number(thread_number)
,m_max_requests(max_requests)
,m_stop(false)
,m_threads(NULL)
{
if((thread_number<=0)||(max_requests<=0))
{
throw std::exception();
}
m_threads=new pthread_t[thread_number]; // id
if(!m_threads)
{
throw std::exception();
}
for(int i=0;i// thread_number ,
{
if(pthread_create(m_threads+i,NULL,worker,(void*)this)!=0)
{// threadpool , ( , ), ( tid )
//
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i]))//
{
delete [] m_threads;
throw std::exception();
}
}
}
template<typename T>
threadpool::~threadpool()
{
delete [] m_threads;
m_stop=true;
}
template<typename T>//
bool threadpool::append(T* request) //
{
m_queuelocker.lock();//
if(m_workqueue.size()>m_max_requests) //
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post(); // ,
// singal()
return true;
}
template<typename T>
void* threadpool::worker(void *arg)
{
threadpool *pool=(threadpool*)arg;
pool->run(); // run
return pool;
}
template<typename T>
void threadpool::run() //
{
while(!m_stop)
{
m_queuestat.wait();// 。
m_queuelocker.lock();
if(m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T* request=m_workqueue.front();
m_workqueue.pop_front();// 。
m_queuelocker.unlock();
request->process(); //process 。
}
}