複数の消費者複数の生産者

5493 ワード

以前は、単一生産消費者がmutexを使用して生産者の消費者問題を解決していました.
ここで、第2の例では、イベントオブジェクトが、単一スレッドの場合に用いる.linux上で信号量で毎回ロックを解除すればよい.
複数に変更する場合は、信号量を用いる.
 
まず、複数の生産と1つの消費の問題を解決します.
const int BUFF_SIZE = 10;  //     
const int NTIMES = 50;     //       
static int producer_gcount = 0;   //          
struct
{
    HANDLE mutex;
    HANDLE sem_empty;   //      ,             
    HANDLE sem_stored;        //  
    int arr[BUFF_SIZE];
    int index;
}   share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
    while(1)
    {
        WaitForSingleObject(share_object.sem_empty,-1) ;  //         
        WaitForSingleObject(share_object.mutex,-1);          //         
        if(share_object.index >= NTIMES){
            //            .
            //        ,         sem_empty   ,     
            if(producer_gcount == 0){
                ++producer_gcount;
                cout << "producer all done" << endl;
            } else {
                cout << " ******* other producer threads *****"  << endl;
            }

            ReleaseSemaphore(share_object.sem_empty,1,0);
            ReleaseMutex(share_object.mutex);

            break;
        }
        //         
        share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
        ReleaseMutex(share_object.mutex);
        ReleaseSemaphore(share_object.sem_stored,1,0);  //    
    }
     return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{

    for(int i = 0; i < NTIMES; ++i){
            WaitForSingleObject(share_object.sem_stored,-1);
            WaitForSingleObject(share_object.mutex,-1);
            share_object.arr[i%BUFF_SIZE] = 0;
            cout << "consumer modified : " << i << endl;
            ReleaseMutex(share_object.mutex);
            ReleaseSemaphore(share_object.sem_empty,1,0);
    }
    cout << "consumer done" << endl;
    return 0;
}



int main(int argc, char *argv[])
{

    share_object.mutex = CreateMutex(0,FALSE,0);
    
    //   3   ,          >=NTIMES;
    //     .   10 
    share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);

    //     .    0  .         .            1 .         
    //      msdn.     linux   sem_init ,           
    share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);

    //20       
    HANDLE producer_handles[20];
    for(int i = 0; i < 20;  ++i)
        producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);

    HANDLE consumer_handle =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);

    WaitForMultipleObjects(5,producer_handles,TRUE,-1);
    WaitForSingleObject(consumer_handle,-1);


    return 0;
}

 
 
複数の消費者の状況:
 
const int BUFF_SIZE = 10;  //     
const int NTIMES = 50;     //       
struct
{
    HANDLE mutex;
    HANDLE sem_empty;   //      ,             
    HANDLE sem_stored;        //  
    int arr[BUFF_SIZE];
    int index;                     //        
    int index_consumer;    //        
    bool bEnd;                  //         ,       ,      .
}   share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
    while(1)
    {
        WaitForSingleObject(share_object.sem_empty,-1) ;  //         
        WaitForSingleObject(share_object.mutex,-1);          //         
        if(share_object.index >= NTIMES){
            if(!share_object.bEnd){
                share_object.bEnd = true;
                //       ,            sem_stored
                ReleaseSemaphore(share_object.sem_stored,1,0);
            }

            ReleaseSemaphore(share_object.sem_empty,1,0);
            ReleaseMutex(share_object.mutex);
            break;
        }
        //         
        share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
        ReleaseMutex(share_object.mutex);
        ReleaseSemaphore(share_object.sem_stored,1,0);  //    
    }
     return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{
    while(1){
            WaitForSingleObject(share_object.sem_stored,-1);
            WaitForSingleObject(share_object.mutex,-1);

            if(share_object.index_consumer >= NTIMES){
                //                
                ReleaseSemaphore(share_object.sem_stored,1,0);
                ReleaseMutex(share_object.mutex);
                break;
            }
            share_object.arr[share_object.index_consumer++%BUFF_SIZE] = 0;
            cout << "consumer modified : " << share_object.index_consumer << endl;
            ReleaseMutex(share_object.mutex);
            ReleaseSemaphore(share_object.sem_empty,1,0);
    }
    cout << "consumer done" << endl;
    return 0;
}



int main(int argc, char *argv[])
{

    share_object.mutex = CreateMutex(0,FALSE,0);
    //     .   10 
    share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);
    //     .    0  .        
    share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);
    share_object.bEnd = false;

    HANDLE producer_handles[20] , consumer_handles[10];
    for(int i = 0; i < 20;  ++i)
        producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);
    for(int i =0 ; i < 10; ++i)
        consumer_handles[i] =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);

    WaitForMultipleObjects(20,producer_handles,TRUE,-1);
    WaitForMultipleObjects(10,consumer_handles,TRUE,-1);



    return 0;
}