Linux-マルチスレッドによる生産者と消費者モデルの実現


一、生産者-消費者モデルの概要実際のソフトウェア開発の過程で、私たちはデータを生成するモジュールを生産者と呼び、データを処理するモジュールが消費者になる.しかし、この2つだけでは生産者-消費者モデルになるには十分ではなく、仲介としてバッファ(メモリ領域)が必要であり、生産者が生成したデータはバッファに入れられ、消費者はバッファからデータを読み出して処理する.(注:上記のモジュールは一般化されており、クラス、関数、スレッド、プロセスなどであってもよい)
この2つの関係図を表すことができます.
まとめ:3-2-1の方法で生産者-消費者モデルを簡単に説明し、3は3つの関係を代表している:生産者-生産者、消費者-消費者、生産者-消費者;2は2つの役割を表します:生産者、消費者;1取引場所を表す:バッファ
二、生産者-消費者モデル間の関係シミュレーション生産者-消費者モデルを実現する前に、私たちはまずこの関係を取り除く必要があります:生産者-生産者:明らかに、この2つの間は必ず競争関係であり、つまり1人の生産者がバッファにデータを置くと、もう1人の生産者はこの空間の消費者-消費者にアクセスできません:同様に、2人の消費者の間も競争の関係であり、これは2人が同時に1つの商品を気に入ったときのようなもので、彼らの間は競争の関係の生産者である消費者である:生産者と消費者の間は実は同期と反発の関係であり、1人の生産者1人の消費者だけがデータを入れた後に消費者が読むことができると仮定している.消費者がデータを手に入れた後、生産者は生産に行きます.これは同期です.しかし、生産者がデータを生産するときに消費者がバッファからデータを受け取ることができないか、消費者がデータを読むときに生産者がバッファにデータを書くことができない.そうしないと、両者がデータを保存/取得するのに失敗し、二義的な問題が発生する可能性が高い.
三、どのように単一生産者-単一消費者モデルを実現するか単一生産者-単一消費者モデルにとって、私たちは単一チェーンテーブルで取引場所をシミュレートすることができ、生産データはチェーンテーブルにノードを挿入することに相当し、消費者がデータを読み取ることはチェーンテーブルノードを削除することに相当する.(ここではヘッドプラグの削除方式を採用していますが、もちろんテールプラグ、テール削除方式で実現することもできます)
生産場所が決定された以上、次に、同期と反発を実現する方法を考慮します.マルチスレッドプログラミングでは、「条件変数(Condition Variable)」に言及しています.ここでは、初期化と破棄のいくつかの関数を示します.

int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
//attr   NULL,      。  Condition Variable      ,       PTHEAD_COND_INITIALIZER   ,    pthread_cond_init       attr   NULL。  

操作関数:
int pthread_cond_broadcast(pthread_cond_t *cond);  
//       Condition Variable          


int pthread_cond_signal(pthread_cond_t *cond);    
//       Condition Variable           


int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,  
              const struct timespec *restrict abstime);  
//pthread_cond_timedwait                  ,     abstime                     ,   ETIMEDOUT。  


int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);  
//  pthread_cond_wait   Condition Variable     ,           :  
1.  Mutex  
2.      
3.     ,    Mutex     

条件変数の相関関数を用いて実装をシミュレートすることができ,生産者スレッドがデータを生産すると消費者スレッドがwaitし,起動するとチェーンテーブルにデータがある(消費者が成功を待つ)消費者がデータを読み取る過程で生産者はwaitで、消費者がデータを読み取った後に起動され、その後バッファにデータを書き続け(ループ......)、起動されるのを待つ過程は上述のpthread_cond_waitとpthread_cond_signal関数を利用して実現することができる.
具体的なコードは以下の通りです.
pthread_mutex_t mylock = PTHREAD_MUTEX_INITIALIZER;  
pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;  

typedef struct _node  
{  
    int data;  
    struct _node* next;  
}node,*pnode,**ppnode;  

pnode alloc_node(int data)  
{  
    pnode _n = (pnode)malloc(sizeof(node));  
    if( _n == NULL )  
    {  
        perror("malloc");  
        return NULL;  
    }  
    _n->data = data;  
    _n->next = NULL;  
    return _n;  
}  

void InitList(ppnode _h)  
{  
    *_h = alloc_node(0);  
}  

void PushFront(pnode _h,int data)  
{  
    assert(_h);  
    pnode ret = alloc_node(data);  
    ret->next = _h->next;  
    _h->next = ret;  
}  
void DelNode(pnode node)  
{  
    assert(node);  
    free(node);  
}  
void PopFront(pnode _h,int* data)  
{  
    assert(_h);  
    assert(data);  
    if(_h->next == NULL)  
    {  
        printf("list is empty!
"
); return ; } pnode del = _h->next; _h->next = del->next; *data = del->data; DelNode(del); } void destory(pnode _h) { assert(_h); pnode start = _h; // pnode del = _h; int data = 0; while(start->next) { PopFront(start,&data); // del = start; // start = del->next; // free(del); // del = NULL; } DelNode(_h); } void show(pnode _h) { assert(_h); pnode start = _h->next; while(start) { printf("%d ",start->data); start = start->next; } printf("
"
); } void* thread_product(void* arg) { pnode node = (pnode)arg; while(1) { usleep(123123); int data = rand()%10000; pthread_mutex_lock(&mylock); PushFront(node,data); pthread_mutex_unlock(&mylock); pthread_cond_signal(&mycond); printf("product:%d
"
,data); } } void* thread_consumer(void* arg) { pnode node = (pnode)arg; int data = 0; while(1) { usleep(1000); pthread_mutex_lock(&mylock); while(node->next == NULL) { pthread_cond_wait(&mycond,&mylock); } PopFront(node,&data); pthread_mutex_unlock(&mylock); printf("consumer:%d
"
,data); } } void test_list(pnode head) { int i = 0; for(; i<10; i++) { PushFront(head,i); show(head); } int data = 0; for(; i>=0; i--) { PopFront(head,&data); show(head); } destory(head); show(head); } int main() { pnode head = NULL; InitList(&head); // test_list(head); pthread_t tid1; pthread_t tid2; pthread_create(&tid1,NULL,thread_product,(void*)head); pthread_create(&tid2,NULL,thread_consumer,(void*)head); pthread_join(tid1,NULL); pthread_join(tid2,NULL); pthread_mutex_destroy(&mylock); pthread_cond_destroy(&mycond); return 0; }