C++スレッドプールの使用——メモ
11015 ワード
一、なぜスレッドプールを使うのですか.
まず、CPUごとに同じ時点で1つのスレッドしか実行できないという真理を見てみましょう.複数のスレッドを「同時に実行」できるのは、システムが異なるスレッド間で切り替え、スケジューリングを行うためであり、速度が速く、同時に実行されているように見えるからです.
マルチスレッドを使用すると作業効率が向上するのは疑いの余地がありませんが、マルチスレッドでプログラムを設計するにはどうすればいいのでしょうか.最も簡単で原始的な方法は、各作業に対して、1つのスレッドを作成して処理し、この作業が終わるまでスレッドは終了しますが、このようにすると、1つの問題があります.もし1つの作業が短いが、作業数が大きい場合、私たちは多くのスレッドを作成しなければなりません.CPUはこれらのスレッドの間で絶えず切り替えます.すべての作業を行うことができ、システムは作業が完了したスレッド(スレッドリソースを解放する)を絶えず終了し、新しい作業を実行するスレッドを新規に作成します....これは非常によくありません.スレッド間の切り替えには時間がかかり、新しいスレッド、終了スレッドも時間がかかります.スレッドの数が多く、タスクが頻繁に更新されると、CPUの占有率はすぐに100%に達する可能性がありますが、実際に処理するのに消費されるCPUは50%未満になる可能性があります.
では、スレッドプールは上記の問題を解決する方法です.彼の考え方は:事前に一定数のスレッドを作成して、タスクが処理する必要がある时、これらのスレッドの中から1つの暇なことを探し出してして、终わった后にスレッドは退出しないで、引き続き新しいタスクを待っています;タスクが処理されない場合、これらのスレッドは新しいタスクが来るまで保留されます.では、このようなスレッドはいくつ必要ですか?答えはCPU数に応じて設定し、スレッドがsleep、Waitforをする可能性があることを考慮して...などの操作があるので、最適な数はCPU数です×2は,CPUごとに平均2スレッドを実行する.これにより,頻繁なスレッド切替,作成,破棄などの操作を回避し,作業に時間を費やすことで自然効率が向上する.
二、肝心な技術:
(一)信号量.1つの信号量をカウンタと見なすことができ、信号量内のカウンタが0であると、その信号量を待つスレッドがブロックされ、信号量内のカウンタ>0であると、その信号量を待つスレッドが活性化される.
○信号量の作成:
○待機信号量
信号量カウンタが0でない場合、上記待機関数はWAIT_を返すOBJECT_0(WaitForMultipleObjectsの場合はWAIT_OBJECT_0+インデックス)が返され、信号量のカウンタが自動的に1減少します.
(二)臨界資源.臨界リソースオブジェクトは、リソースが同じ時間に1つのスレッドしかアクセスできないことを保証するために使用されます.その意味は、あるスレッドがリソースを読み込んでいると仮定し、別のスレッドがこのリソースを修正しようとしている場合、リソースが保護されていないと、読み込まれたデータが期限切れのデータである可能性があり、エラーが発生し、他の予期せぬ問題を引き起こす可能性があるということです.
(三)ループキュー.ループキューは配列またはチェーンテーブルで実現でき、キューに「満」がある場合は配列を使用するのが便利です.2つのインデックスを設定します.1つはライト(nForWirte)、1つはリード(nForRead)、最大容量はnMaxSizeとします.式は次のようになります.
○読み:
三、コード
(一)CHXThreadPool.h
(二)CHXThreadPool.cpp
まず、CPUごとに同じ時点で1つのスレッドしか実行できないという真理を見てみましょう.複数のスレッドを「同時に実行」できるのは、システムが異なるスレッド間で切り替え、スケジューリングを行うためであり、速度が速く、同時に実行されているように見えるからです.
マルチスレッドを使用すると作業効率が向上するのは疑いの余地がありませんが、マルチスレッドでプログラムを設計するにはどうすればいいのでしょうか.最も簡単で原始的な方法は、各作業に対して、1つのスレッドを作成して処理し、この作業が終わるまでスレッドは終了しますが、このようにすると、1つの問題があります.もし1つの作業が短いが、作業数が大きい場合、私たちは多くのスレッドを作成しなければなりません.CPUはこれらのスレッドの間で絶えず切り替えます.すべての作業を行うことができ、システムは作業が完了したスレッド(スレッドリソースを解放する)を絶えず終了し、新しい作業を実行するスレッドを新規に作成します....これは非常によくありません.スレッド間の切り替えには時間がかかり、新しいスレッド、終了スレッドも時間がかかります.スレッドの数が多く、タスクが頻繁に更新されると、CPUの占有率はすぐに100%に達する可能性がありますが、実際に処理するのに消費されるCPUは50%未満になる可能性があります.
では、スレッドプールは上記の問題を解決する方法です.彼の考え方は:事前に一定数のスレッドを作成して、タスクが処理する必要がある时、これらのスレッドの中から1つの暇なことを探し出してして、终わった后にスレッドは退出しないで、引き続き新しいタスクを待っています;タスクが処理されない場合、これらのスレッドは新しいタスクが来るまで保留されます.では、このようなスレッドはいくつ必要ですか?答えはCPU数に応じて設定し、スレッドがsleep、Waitforをする可能性があることを考慮して...などの操作があるので、最適な数はCPU数です×2は,CPUごとに平均2スレッドを実行する.これにより,頻繁なスレッド切替,作成,破棄などの操作を回避し,作業に時間を費やすことで自然効率が向上する.
二、肝心な技術:
(一)信号量.1つの信号量をカウンタと見なすことができ、信号量内のカウンタが0であると、その信号量を待つスレッドがブロックされ、信号量内のカウンタ>0であると、その信号量を待つスレッドが活性化される.
○信号量の作成:
HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, // NULL
LONG lInitialCount, //
LONG lMaximumCount, //
LPCTSTR lpName ); //
○増加数BOOL ReleaseSemaphore( HANDLE hSemaphore, //
LONG lReleaseCount, //
LPLONG lpPreviousCount ); // , NULL
○待機信号量
WaitForSingleObject
WaitForMultipleObjects
信号量カウンタが0でない場合、上記待機関数はWAIT_を返すOBJECT_0(WaitForMultipleObjectsの場合はWAIT_OBJECT_0+インデックス)が返され、信号量のカウンタが自動的に1減少します.
(二)臨界資源.臨界リソースオブジェクトは、リソースが同じ時間に1つのスレッドしかアクセスできないことを保証するために使用されます.その意味は、あるスレッドがリソースを読み込んでいると仮定し、別のスレッドがこのリソースを修正しようとしている場合、リソースが保護されていないと、読み込まれたデータが期限切れのデータである可能性があり、エラーが発生し、他の予期せぬ問題を引き起こす可能性があるということです.
(三)ループキュー.ループキューは配列またはチェーンテーブルで実現でき、キューに「満」がある場合は配列を使用するのが便利です.2つのインデックスを設定します.1つはライト(nForWirte)、1つはリード(nForRead)、最大容量はnMaxSizeとします.式は次のようになります.
○読み:
if( nForRead == nForWrite )
{
//
}
else
{
value = pDatas[ nForRead ];
nForRead = ( nForRead + 1 ) % nMaxSize;
}
○書き込み:int nIndex = ( nForWrite + 1 ) % nMaxSize;
if( nIndex == nForRead )
{
//
}
else
{
pDatas[ nForWrite ] = value;
nForWrite = nIndex;
}
三、コード
(一)CHXThreadPool.h
#pragma once
class CHXWorker
{
public:
CHXWorker();
virtual ~CHXWorker();
public:
// Delete the Worker when DoWorker function has returns TRUE.
virtual BOOL DoWorker() = 0;
};
class CHXThreadPool
{
public:
CHXThreadPool();
virtual ~CHXThreadPool();
public:
BOOL InitThreadPool( long nTaskQueue = 512, int nMaxThreads = 0 );
void RelaseThreadPool();
BOOL PostAWorker( CHXWorker * pWorker );
int GetCurrentWorkers();
protected:
static int GetCPUNumber();
static DWORD WINAPI DoWorker( LPVOID lpVoid );
protected:
BOOL m_bReady;
HANDLE m_hForWriter;
HANDLE m_hForReader;
HANDLE * m_phThreads;
CHXWorker * * m_pWorkerQueue;
long m_nWriter;
long m_nReader;
int m_nMaxThreads;
int m_nMaxWorkers;
CRITICAL_SECTION m_csQueue;
};
(二)CHXThreadPool.cpp
#include "stdafx.h"
#include "HXThreadPool.h"
CHXWorker::CHXWorker()
{
}
CHXWorker::~CHXWorker()
{
}
CHXThreadPool::CHXThreadPool()
: m_bReady( FALSE )
, m_hForWriter( NULL )
, m_hForReader( NULL )
, m_phThreads( NULL )
, m_pWorkerQueue( NULL )
, m_nWriter( 0 )
, m_nReader( 0 )
, m_nMaxWorkers( 0 )
, m_nMaxThreads( 0 )
{
}
CHXThreadPool::~CHXThreadPool()
{
RelaseThreadPool();
}
BOOL CHXThreadPool::InitThreadPool( long nTaskQueue, int nMaxThreads )
{
BOOL bResult = FALSE;
int nThreadCreated = 0;
__try
{
if( m_bReady )
__leave;
//
InitializeCriticalSection( &m_csQueue );
// , , 。
m_hForReader = CreateSemaphore( NULL, 0, nTaskQueue, NULL );
m_hForWriter = CreateSemaphore( NULL, nTaskQueue, nTaskQueue, NULL );
if( m_hForReader == NULL || m_hForWriter == NULL )
__leave;
//
m_pWorkerQueue = new CHXWorker * [ nTaskQueue ];
if( m_pWorkerQueue == NULL )
__leave;
for( int i = 0; i < nTaskQueue; ++ i )
m_pWorkerQueue[ i ] = NULL;
m_nMaxWorkers = nTaskQueue;
m_nReader = 0;
m_nWriter = 0;
//
if( nMaxThreads <= 0 )
m_nMaxThreads = GetCPUNumber() * 2;
else
m_nMaxThreads = nMaxThreads;
m_phThreads = new HANDLE [ m_nMaxThreads ];
if( m_phThreads == NULL )
__leave;
for( int i = 0; i < m_nMaxThreads; ++ i )
{
m_phThreads[ i ] = CreateThread( NULL, 0, &CHXThreadPool::DoWorker, this, 0, NULL );
if( m_phThreads[ i ] == NULL )
__leave;
else
++ nThreadCreated;
}
bResult = TRUE;
m_bReady = TRUE;
}
__finally
{
if( ! bResult ) // ,
{
// Delete queue
EnterCriticalSection( &m_csQueue );
if( m_pWorkerQueue != NULL )
{
delete [] m_pWorkerQueue;
m_pWorkerQueue = NULL;
m_nReader = 0;
m_nWriter = 0;
m_nMaxWorkers = 0;
}
LeaveCriticalSection( &m_csQueue );
// Clear Threads...
if( m_phThreads != NULL )
{
ReleaseSemaphore( m_hForReader, nThreadCreated, NULL );
WaitForMultipleObjects( nThreadCreated, m_phThreads, TRUE, INFINITE );
for( int i = 0; i < nThreadCreated; ++ i )
CloseHandle( m_phThreads[ i ]);
delete [] m_phThreads;
m_phThreads = NULL;
}
// Close Semaphores...
if( m_hForReader != NULL )
{
CloseHandle( m_hForReader );
m_hForReader = NULL;
}
if( m_hForWriter != NULL )
{
CloseHandle( m_hForWriter );
m_hForWriter = NULL;
}
// Delete cs...
DeleteCriticalSection( &m_csQueue );
m_dwTimedout = INFINITE;
}
}
return bResult;
}
void CHXThreadPool::RelaseThreadPool()
{
if( m_bReady )
{
m_bReady = FALSE;
EnterCriticalSection( &m_csQueue );
if( m_pWorkerQueue != NULL )
{
delete [] m_pWorkerQueue;
m_pWorkerQueue = NULL;
m_nReader = 0;
m_nWriter = 0;
m_nMaxWorkers = 0;
}
LeaveCriticalSection( &m_csQueue );
// Clear Threads...
if( m_phThreads != NULL )
{
ReleaseSemaphore( m_hForReader, m_nMaxThreads, NULL );
WaitForMultipleObjects( m_nMaxThreads, m_phThreads, TRUE, INFINITE );
for( int i = 0; i < m_nMaxThreads; ++ i )
CloseHandle( m_phThreads[ i ]);
delete [] m_phThreads;
m_phThreads = NULL;
}
// Close Semaphores...
if( m_hForReader != NULL )
{
CloseHandle( m_hForReader );
m_hForReader = NULL;
}
if( m_hForWriter != NULL )
{
CloseHandle( m_hForWriter );
m_hForWriter = NULL;
}
// Delete cs...
DeleteCriticalSection( &m_csQueue );
m_nReader = 0;
m_nWriter = 0;
m_dwTimedout = INFINITE;
}
}
BOOL CHXThreadPool::PostAWorker( CHXWorker * pWorker ) //
{
DWORD dwWaitfor;
// , , ( )
dwWaitfor = WaitForSingleObject( m_hForWriter, INFINITE );
//
if( dwWaitfor == WAIT_OBJECT_0 )
{
long n;
EnterCriticalSection( &m_csQueue );
if( m_pWorkerQueue == NULL )
{
LeaveCriticalSection( &m_csQueue );
return FALSE;
}
n = ( m_nWriter + 1 ) % m_nMaxWorkers;
if( n == m_nReader )
{
LeaveCriticalSection( &m_csQueue );
return FALSE;
}
else
{
m_pWorkerQueue[ m_nWriter ] = pWorker;
m_nWriter = n;
LeaveCriticalSection( &m_csQueue );
}
// ,
ReleaseSemaphore( m_hForReader, 1, NULL );
}
else
return FALSE;
return TRUE;
}
int CHXThreadPool::GetCurrentWorkers() //
{
int n;
EnterCriticalSection( &m_csQueue );
n = ( m_nWriter - m_nReader + m_nMaxWorkers ) % m_nMaxWorkers;
LeaveCriticalSection( &m_csQueue );
return n;
}
DWORD CHXThreadPool::DoWorker( LPVOID lpVoid ) //
{
CHXThreadPool * pThreadPool = ( CHXThreadPool * ) lpVoid;
CHXWorker * pWorker;
DWORD dwWaitfor;
while( TRUE )
{
// , ,
dwWaitfor = WaitForSingleObject( pThreadPool->m_hForReader, INFINITE );
if( dwWaitfor == WAIT_OBJECT_0 )
{
EnterCriticalSection( &( pThreadPool->m_csQueue ));
if( pThreadPool->m_pWorkerQueue == NULL )
{
LeaveCriticalSection( &( pThreadPool->m_csQueue ));
break;
}
if( pThreadPool->m_nReader == pThreadPool->m_nWriter )
{
LeaveCriticalSection( &( pThreadPool->m_csQueue ));
continue;
}
//
pWorker = pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ];
pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ] = NULL;
pThreadPool->m_nReader = ( pThreadPool->m_nReader + 1 ) % pThreadPool->m_nMaxWorkers;
LeaveCriticalSection( &( pThreadPool->m_csQueue ));
// , , ,
ReleaseSemaphore( pThreadPool->m_hForWriter, 1, NULL );
if( pWorker == NULL )
continue;
else
{
if( pWorker->DoWorker()) // TRUE, 。
delete pWorker;
}
}
}
return 0;
}
int CHXThreadPool::GetCPUNumber() // CPU
{
SYSTEM_INFO sysInfo;
GetSystemInfo( &sysInfo );
return ( int ) sysInfo.dwNumberOfProcessors;
}