C++スレッドプールの使用——メモ

11015 ワード

一、なぜスレッドプールを使うのですか.
まず、CPUごとに同じ時点で1つのスレッドしか実行できないという真理を見てみましょう.複数のスレッドを「同時に実行」できるのは、システムが異なるスレッド間で切り替え、スケジューリングを行うためであり、速度が速く、同時に実行されているように見えるからです.
マルチスレッドを使用すると作業効率が向上するのは疑いの余地がありませんが、マルチスレッドでプログラムを設計するにはどうすればいいのでしょうか.最も簡単で原始的な方法は、各作業に対して、1つのスレッドを作成して処理し、この作業が終わるまでスレッドは終了しますが、このようにすると、1つの問題があります.もし1つの作業が短いが、作業数が大きい場合、私たちは多くのスレッドを作成しなければなりません.CPUはこれらのスレッドの間で絶えず切り替えます.すべての作業を行うことができ、システムは作業が完了したスレッド(スレッドリソースを解放する)を絶えず終了し、新しい作業を実行するスレッドを新規に作成します....これは非常によくありません.スレッド間の切り替えには時間がかかり、新しいスレッド、終了スレッドも時間がかかります.スレッドの数が多く、タスクが頻繁に更新されると、CPUの占有率はすぐに100%に達する可能性がありますが、実際に処理するのに消費されるCPUは50%未満になる可能性があります.
では、スレッドプールは上記の問題を解決する方法です.彼の考え方は:事前に一定数のスレッドを作成して、タスクが処理する必要がある时、これらのスレッドの中から1つの暇なことを探し出してして、终わった后にスレッドは退出しないで、引き続き新しいタスクを待っています;タスクが処理されない場合、これらのスレッドは新しいタスクが来るまで保留されます.では、このようなスレッドはいくつ必要ですか?答えはCPU数に応じて設定し、スレッドがsleep、Waitforをする可能性があることを考慮して...などの操作があるので、最適な数はCPU数です×2は,CPUごとに平均2スレッドを実行する.これにより,頻繁なスレッド切替,作成,破棄などの操作を回避し,作業に時間を費やすことで自然効率が向上する.
二、肝心な技術:
(一)信号量.1つの信号量をカウンタと見なすことができ、信号量内のカウンタが0であると、その信号量を待つスレッドがブロックされ、信号量内のカウンタ>0であると、その信号量を待つスレッドが活性化される.
○信号量の作成:
HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, //     NULL  
                        LONG lInitialCount,                          //       
                        LONG lMaximumCount,                          //       
                        LPCTSTR lpName );                            //       
○増加数
BOOL ReleaseSemaphore(  HANDLE hSemaphore,         //       
                        LONG lReleaseCount,        //          
                        LPLONG lpPreviousCount );  //        ,       NULL

○待機信号量
WaitForSingleObject
WaitForMultipleObjects

信号量カウンタが0でない場合、上記待機関数はWAIT_を返すOBJECT_0(WaitForMultipleObjectsの場合はWAIT_OBJECT_0+インデックス)が返され、信号量のカウンタが自動的に1減少します.
(二)臨界資源.臨界リソースオブジェクトは、リソースが同じ時間に1つのスレッドしかアクセスできないことを保証するために使用されます.その意味は、あるスレッドがリソースを読み込んでいると仮定し、別のスレッドがこのリソースを修正しようとしている場合、リソースが保護されていないと、読み込まれたデータが期限切れのデータである可能性があり、エラーが発生し、他の予期せぬ問題を引き起こす可能性があるということです.
(三)ループキュー.ループキューは配列またはチェーンテーブルで実現でき、キューに「満」がある場合は配列を使用するのが便利です.2つのインデックスを設定します.1つはライト(nForWirte)、1つはリード(nForRead)、最大容量はnMaxSizeとします.式は次のようになります.
○読み:
if( nForRead == nForWrite )
{
      //      
}
else
{
      value = pDatas[ nForRead ];
      nForRead = ( nForRead + 1 ) % nMaxSize;
}
○書き込み:
int nIndex = ( nForWrite + 1 ) % nMaxSize;
if( nIndex == nForRead )
{
        //     
}
else
{
        pDatas[ nForWrite ] = value;
        nForWrite = nIndex;
}

三、コード
(一)CHXThreadPool.h
#pragma once

class CHXWorker
{
public:
    CHXWorker();
    virtual ~CHXWorker();
public:

    // Delete the Worker when DoWorker function has returns TRUE.
    virtual BOOL DoWorker() = 0;
};

class CHXThreadPool
{
public:
    CHXThreadPool();
    virtual ~CHXThreadPool();

public:
    BOOL     InitThreadPool( long nTaskQueue = 512, int nMaxThreads = 0 );
    void     RelaseThreadPool();
    BOOL     PostAWorker( CHXWorker * pWorker );
    int      GetCurrentWorkers();

protected:
    static int GetCPUNumber();
    static DWORD WINAPI DoWorker( LPVOID lpVoid );

protected:
    BOOL          m_bReady;
    HANDLE        m_hForWriter;
    HANDLE        m_hForReader;
    HANDLE *      m_phThreads;
    CHXWorker * * m_pWorkerQueue;
    long          m_nWriter;
    long          m_nReader;
    int           m_nMaxThreads;
    int           m_nMaxWorkers;
    CRITICAL_SECTION m_csQueue;
};

(二)CHXThreadPool.cpp
#include "stdafx.h"
#include "HXThreadPool.h"

CHXWorker::CHXWorker()
{

}
CHXWorker::~CHXWorker()
{

}

CHXThreadPool::CHXThreadPool()
    : m_bReady( FALSE )
    , m_hForWriter( NULL )
    , m_hForReader( NULL )
    , m_phThreads( NULL )
    , m_pWorkerQueue( NULL )
    , m_nWriter( 0 )
    , m_nReader( 0 )
    , m_nMaxWorkers( 0 )
    , m_nMaxThreads( 0 )
{
}

CHXThreadPool::~CHXThreadPool()
{
    RelaseThreadPool();
}

BOOL CHXThreadPool::InitThreadPool( long nTaskQueue, int nMaxThreads )
{
    BOOL bResult = FALSE;
    int  nThreadCreated = 0;

    __try
    {
        if( m_bReady )
            __leave;


        //        
        InitializeCriticalSection( &m_csQueue );

        //        ,   ,   。
        m_hForReader = CreateSemaphore( NULL, 0, nTaskQueue, NULL );
        m_hForWriter = CreateSemaphore( NULL, nTaskQueue, nTaskQueue, NULL );
        if( m_hForReader == NULL || m_hForWriter == NULL )
            __leave;

        //           
        m_pWorkerQueue = new CHXWorker * [ nTaskQueue ];
        if( m_pWorkerQueue == NULL )
            __leave;
        for( int i = 0; i < nTaskQueue; ++ i )
            m_pWorkerQueue[ i ] = NULL;
        m_nMaxWorkers = nTaskQueue;
        m_nReader = 0;
        m_nWriter = 0;

        //       
        if( nMaxThreads <= 0 )
            m_nMaxThreads = GetCPUNumber() * 2;
        else
            m_nMaxThreads = nMaxThreads;
        m_phThreads = new HANDLE [ m_nMaxThreads ];
        if( m_phThreads == NULL )
            __leave;
        for( int i = 0; i < m_nMaxThreads; ++ i )
        {
            m_phThreads[ i ] = CreateThread( NULL, 0, &CHXThreadPool::DoWorker, this, 0, NULL );
            if( m_phThreads[ i ] == NULL )
                __leave;
            else
                ++ nThreadCreated;
        }

        bResult = TRUE;
        m_bReady = TRUE;
    }
    __finally
    {
        if( ! bResult )  //     ,    
        {
            // Delete queue
            EnterCriticalSection( &m_csQueue );
            if( m_pWorkerQueue != NULL )
            {
                delete [] m_pWorkerQueue;
                m_pWorkerQueue = NULL;
                m_nReader = 0;
                m_nWriter = 0;
                m_nMaxWorkers = 0;
            }
            LeaveCriticalSection( &m_csQueue );

            // Clear Threads...
            if( m_phThreads != NULL )
            {
                ReleaseSemaphore( m_hForReader, nThreadCreated, NULL );
                WaitForMultipleObjects( nThreadCreated, m_phThreads, TRUE, INFINITE );
                for( int i = 0; i < nThreadCreated; ++ i )
                    CloseHandle( m_phThreads[ i ]);
                delete [] m_phThreads;
                m_phThreads = NULL;
            }

            // Close Semaphores...
            if( m_hForReader != NULL )
            {
                CloseHandle( m_hForReader );
                m_hForReader = NULL;
            }
            if( m_hForWriter != NULL )
            {
                CloseHandle( m_hForWriter );
                m_hForWriter = NULL;
            }

            // Delete cs...
            DeleteCriticalSection( &m_csQueue );
            m_dwTimedout = INFINITE;
        }
    }
    return bResult;
}

void CHXThreadPool::RelaseThreadPool()
{
    if( m_bReady )
    {
        m_bReady = FALSE;

        EnterCriticalSection( &m_csQueue );
        if( m_pWorkerQueue != NULL )
        {
            delete [] m_pWorkerQueue;
            m_pWorkerQueue = NULL;
            m_nReader = 0;
            m_nWriter = 0;
            m_nMaxWorkers = 0;
        }
        LeaveCriticalSection( &m_csQueue );

        // Clear Threads...
        if( m_phThreads != NULL )
        {
            ReleaseSemaphore( m_hForReader, m_nMaxThreads, NULL );
            WaitForMultipleObjects( m_nMaxThreads, m_phThreads, TRUE, INFINITE );
            for( int i = 0; i < m_nMaxThreads; ++ i )
                CloseHandle( m_phThreads[ i ]);
            delete [] m_phThreads;
            m_phThreads = NULL;
        }

        // Close Semaphores...
        if( m_hForReader != NULL )
        {
            CloseHandle( m_hForReader );
            m_hForReader = NULL;
        }
        if( m_hForWriter != NULL )
        {
            CloseHandle( m_hForWriter );
            m_hForWriter = NULL;
        }

        // Delete cs...
        DeleteCriticalSection( &m_csQueue );
        m_nReader = 0;
        m_nWriter = 0;
        m_dwTimedout = INFINITE;
    }
}

BOOL CHXThreadPool::PostAWorker( CHXWorker * pWorker )  //       
{
    DWORD dwWaitfor;
	
	//       ,   ,      (          )
    dwWaitfor = WaitForSingleObject( m_hForWriter, INFINITE );
	
	//            
    if( dwWaitfor == WAIT_OBJECT_0 )
    {
        long n;
        EnterCriticalSection( &m_csQueue );
        if( m_pWorkerQueue == NULL )
        {
            LeaveCriticalSection( &m_csQueue );
            return FALSE;
        }
        n = ( m_nWriter + 1 ) % m_nMaxWorkers;
        if( n == m_nReader )
        {
            LeaveCriticalSection( &m_csQueue );
            return FALSE;
        }
        else
        {
            m_pWorkerQueue[ m_nWriter ] = pWorker;
            m_nWriter = n;
            LeaveCriticalSection( &m_csQueue );
        }
		//          ,            
        ReleaseSemaphore( m_hForReader, 1, NULL );
    }
    else
        return FALSE;

    return TRUE;
}

int CHXThreadPool::GetCurrentWorkers()  //                    
{
    int n;
    EnterCriticalSection( &m_csQueue );
    n = ( m_nWriter - m_nReader + m_nMaxWorkers ) % m_nMaxWorkers;
    LeaveCriticalSection( &m_csQueue );
    return n;
}

DWORD CHXThreadPool::DoWorker( LPVOID lpVoid )  //     
{
    CHXThreadPool * pThreadPool = ( CHXThreadPool * ) lpVoid;
    CHXWorker * pWorker;
    DWORD dwWaitfor;
    while( TRUE )
    {
	    //        ,   ,          
        dwWaitfor = WaitForSingleObject( pThreadPool->m_hForReader, INFINITE );
        if( dwWaitfor == WAIT_OBJECT_0 )
        {
            EnterCriticalSection( &( pThreadPool->m_csQueue ));
            if( pThreadPool->m_pWorkerQueue == NULL )
            {
                LeaveCriticalSection( &( pThreadPool->m_csQueue ));
                break;
            }
            if( pThreadPool->m_nReader == pThreadPool->m_nWriter )
            {
                LeaveCriticalSection( &( pThreadPool->m_csQueue ));
                continue;
            }
			//         
            pWorker = pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ];
            pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ] = NULL;
            pThreadPool->m_nReader = ( pThreadPool->m_nReader + 1 ) % pThreadPool->m_nMaxWorkers;
            LeaveCriticalSection( &( pThreadPool->m_csQueue ));
			
			//        ,     ,             ,           
            ReleaseSemaphore( pThreadPool->m_hForWriter, 1, NULL );

            if( pWorker == NULL )
                continue;
            else
            {
                if( pWorker->DoWorker()) //          TRUE,       。         
                    delete pWorker;
            }
        }
    }
    return 0;
}

int CHXThreadPool::GetCPUNumber()  //      CPU   
{
    SYSTEM_INFO sysInfo;
    GetSystemInfo( &sysInfo );
    return ( int ) sysInfo.dwNumberOfProcessors;
}