複数の消費者複数の生産者
5493 ワード
以前は、単一生産消費者がmutexを使用して生産者の消費者問題を解決していました.
ここで、第2の例では、イベントオブジェクトが、単一スレッドの場合に用いる.linux上で信号量で毎回ロックを解除すればよい.
複数に変更する場合は、信号量を用いる.
まず、複数の生産と1つの消費の問題を解決します.
複数の消費者の状況:
ここで、第2の例では、イベントオブジェクトが、単一スレッドの場合に用いる.linux上で信号量で毎回ロックを解除すればよい.
複数に変更する場合は、信号量を用いる.
まず、複数の生産と1つの消費の問題を解決します.
const int BUFF_SIZE = 10; //
const int NTIMES = 50; //
static int producer_gcount = 0; //
struct
{
HANDLE mutex;
HANDLE sem_empty; // ,
HANDLE sem_stored; //
int arr[BUFF_SIZE];
int index;
} share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
while(1)
{
WaitForSingleObject(share_object.sem_empty,-1) ; //
WaitForSingleObject(share_object.mutex,-1); //
if(share_object.index >= NTIMES){
// .
// , sem_empty ,
if(producer_gcount == 0){
++producer_gcount;
cout << "producer all done" << endl;
} else {
cout << " ******* other producer threads *****" << endl;
}
ReleaseSemaphore(share_object.sem_empty,1,0);
ReleaseMutex(share_object.mutex);
break;
}
//
share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
ReleaseMutex(share_object.mutex);
ReleaseSemaphore(share_object.sem_stored,1,0); //
}
return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{
for(int i = 0; i < NTIMES; ++i){
WaitForSingleObject(share_object.sem_stored,-1);
WaitForSingleObject(share_object.mutex,-1);
share_object.arr[i%BUFF_SIZE] = 0;
cout << "consumer modified : " << i << endl;
ReleaseMutex(share_object.mutex);
ReleaseSemaphore(share_object.sem_empty,1,0);
}
cout << "consumer done" << endl;
return 0;
}
int main(int argc, char *argv[])
{
share_object.mutex = CreateMutex(0,FALSE,0);
// 3 , >=NTIMES;
// . 10
share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);
// . 0 . . 1 .
// msdn. linux sem_init ,
share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);
//20
HANDLE producer_handles[20];
for(int i = 0; i < 20; ++i)
producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);
HANDLE consumer_handle =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);
WaitForMultipleObjects(5,producer_handles,TRUE,-1);
WaitForSingleObject(consumer_handle,-1);
return 0;
}
複数の消費者の状況:
const int BUFF_SIZE = 10; //
const int NTIMES = 50; //
struct
{
HANDLE mutex;
HANDLE sem_empty; // ,
HANDLE sem_stored; //
int arr[BUFF_SIZE];
int index; //
int index_consumer; //
bool bEnd; // , , .
} share_object;
unsigned int WINAPI producer_thread( void* lpParameter)
{
while(1)
{
WaitForSingleObject(share_object.sem_empty,-1) ; //
WaitForSingleObject(share_object.mutex,-1); //
if(share_object.index >= NTIMES){
if(!share_object.bEnd){
share_object.bEnd = true;
// , sem_stored
ReleaseSemaphore(share_object.sem_stored,1,0);
}
ReleaseSemaphore(share_object.sem_empty,1,0);
ReleaseMutex(share_object.mutex);
break;
}
//
share_object.arr[share_object.index++ % BUFF_SIZE] = 1;
ReleaseMutex(share_object.mutex);
ReleaseSemaphore(share_object.sem_stored,1,0); //
}
return 0;
}
unsigned int WINAPI consumer_thread( void* lpParameter)
{
while(1){
WaitForSingleObject(share_object.sem_stored,-1);
WaitForSingleObject(share_object.mutex,-1);
if(share_object.index_consumer >= NTIMES){
//
ReleaseSemaphore(share_object.sem_stored,1,0);
ReleaseMutex(share_object.mutex);
break;
}
share_object.arr[share_object.index_consumer++%BUFF_SIZE] = 0;
cout << "consumer modified : " << share_object.index_consumer << endl;
ReleaseMutex(share_object.mutex);
ReleaseSemaphore(share_object.sem_empty,1,0);
}
cout << "consumer done" << endl;
return 0;
}
int main(int argc, char *argv[])
{
share_object.mutex = CreateMutex(0,FALSE,0);
// . 10
share_object.sem_empty = CreateSemaphore(NULL,10,100,NULL);
// . 0 .
share_object.sem_stored = CreateSemaphore(NULL,0,100,NULL);
share_object.bEnd = false;
HANDLE producer_handles[20] , consumer_handles[10];
for(int i = 0; i < 20; ++i)
producer_handles[i] =(HANDLE) _beginthreadex(0,0,producer_thread,0,0,0);
for(int i =0 ; i < 10; ++i)
consumer_handles[i] =(HANDLE) _beginthreadex(0,0,consumer_thread,0,0,0);
WaitForMultipleObjects(20,producer_handles,TRUE,-1);
WaitForMultipleObjects(10,consumer_handles,TRUE,-1);
return 0;
}