linuxの下で簡単な非同期マルチスレッドタスクの配布を実現

26273 ワード

linuxの下で簡単な非同期マルチスレッドタスクの配布を実現
非同期マルチタスクパラレル処理はlinuxシーンで多くの実装方法があります.現在の主流はいくつかの方案があります:マルチプロセス方案、1 master+N workers方式、apacheなどに類似します;マルチスレッド・スキーマ、またはスレッド・プールを維持します.この例では、単純な単一プロセスマルチスレッド非同期配布タスクモデルを実現します.本実装には以下の特徴がある:*メンテナンススレッドが簡単*直接関数を転送でき、関数のパラメータ*スレッド間でメッセージキュー通信を使用し、リアルタイム性が高く、簡単*メインスレッドから分離された簡単なタスクを実現できる*swooleでは、nodejsの非同期タスクは、特定のioまたはイベントの場合に非同期化する必要があり、いつでも非同期化できるわけではない.この例の実装では、いつでも非同期でメッセージキューに制限されるなどの要因があります.この例では、*スレッド間にロックメカニズムがなく、共有データが安全ではありません.*linuxメッセージキューの各要素は最大65535バイトしかありません.*一時的には、ワークスレッドとプライマリスレッドは非同期です.ただし、1つのワークスレッドは同時に1つのタスクしか受け入れられず、次のタスクを引き継ぐには実行が完了しなければなりません.合計3つのワークスレッドがブロックされている場合、新しいタスクを続行することはできません.後続の更新:*getcontext,setcontext,swapcontextを使用してコラボレーションスケジューリングを実現*ロック呼び出しとロックなし呼び出しの2つの方法を実現
code.c:
/*
 *                 
 * c99 thread.c -o thread -l pthread -D _XOPEN_SOURCE
 * */
#define _XOPEN_SOURCE 1
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define LOGF(STR, ...) logfmt(STR, __FILE__,__LINE__,__func__,##__VA_ARGS__)

int msgqid = 0;             //       worker         id,      
int msgtype = 9812;         //      ,  
int IPC_KEY = 100011;       //      IPC_KEY     ftok  

/*          */
/*             ,           ,              */
typedef struct message {
    long msg_type;              /*       */
    void* (*func)(void*);       /*                          */
    void* param;                /*        */
    void** retval;              /*                             */
    void* (*callback)(void* retval, void*origin_param);   /*     callback            ,           */
} message;

/*        */
typedef struct threadtask {
    pthread_t tid;
    int is_working;
    int working_times;
} threadtask;
/*      */
typedef struct taskpool {
     threadtask* pool;
     int len;
} taskpool;

taskpool* mainpool;

void logfmt(const char* str, const char* file, const int line, const char* func, ...);

/*protected    main   task consumer */
void* taskconsumer(void*ptr) {
    threadtask* myth = (threadtask*)ptr;
    struct message msgq;
    int i = 0;
    for (i=0; ;i++) {
        msgrcv(msgqid, &msgq, sizeof(message)-sizeof(long), msgtype, 0);
        myth->is_working = 1;
        myth->working_times++;
        LOGF("threadtask(%ld) got a task: loopid=%d taskfunc=%p param=%d
"
, myth->tid, i, msgq.func, (msgq.param==NULL?0:*(int*)msgq.param)); if (msgq.func == NULL) { LOGF("msgq.func is NULL, ignore this msg
"
); continue; } void*ret = msgq.func((void*)msgq.param); if (msgq.retval != NULL) { *(msgq.retval) = ret; } myth->is_working = 0; if (msgq.callback != NULL) { LOGF("this task has callback loopid=%d
"
, i); msgq.callback((void*)ret, msgq.param); } } } /*public user task*/ void* task1(void* ptr) { LOGF("task1 start, void*=%p v=%d
"
, ptr, *(int*)ptr); sleep(1); LOGF("task1 over
"
); return NULL; } void* task2(void* ptr) { LOGF("task2 start, void*=%x v=%d
"
, ptr, *(int*)ptr); sleep(10); LOGF("task2 over
"
); } void* callback1(void* ret, void* param) { LOGF("i am callback: ret=%p param=%p
"
, ret, param); } int recycletaskpool(taskpool* thepool); /*for main init workers, waiting msgqueue for task*/ taskpool* inittaskpool(int worker_num) { taskpool* mainpool = (taskpool*)malloc(sizeof(taskpool)); memset(mainpool, 0, sizeof(*mainpool)); mainpool->pool = (threadtask*)malloc(sizeof(threadtask)*worker_num); memset(mainpool->pool, 0, sizeof(*mainpool->pool)); mainpool->len = worker_num; int has_error = 0; int i = 0; for (i=0; ipool[i].tid); int ret = pthread_create(tid_ptr, NULL, (void *)taskconsumer, (void*)&(mainpool->pool[i])); if (ret != 0) { perror("create thread failed:"); has_error = 1; break; } pthread_detach(*tid_ptr); } if (has_error) { LOGF("create pthread error, go to recyclepool
"
); recycletaskpool(mainpool); } return mainpool; } /*for main */ int recycletaskpool(taskpool* thepool) { if (thepool != NULL) { if (thepool->pool != NULL) { free(thepool->pool); } free(thepool); } return 0; } /*public */ int async_do(void*(*func)(void*), void* param, void** ret, void* (*callback)(void*,void*)) { struct message msgq; msgq.msg_type = msgtype; msgq.func = func; msgq.param = param; msgq.retval = ret; msgq.callback = callback; // send msgq to queue int sendret = msgsnd(msgqid, &msgq, sizeof(message)-sizeof(long), 0); return sendret; } int main() { // msgqid = msgget(IPC_KEY, IPC_CREAT | 0666); // 4 int worker_num = 4; mainpool = inittaskpool(worker_num); char s[100] = {}; int sendret = 0; void* (*taskfunc) (void*) = NULL; void* (*thecallback) (void*,void*) = NULL; int dorecycle = 0; int i = 0; for (i=0; i<16; i++) { LOGF("input which task run: i=%d
"
, i); scanf("%s", s); switch (s[0]) { case '1': taskfunc = task1; thecallback = NULL; break; case '2': taskfunc = task2; thecallback = NULL; break; case '3':case '4':case '5':case '6':case '7':case '8': taskfunc = task2; thecallback = callback1; break; case 'q': dorecycle = 1; // break; default: continue; break; } if (dorecycle) { break; } sendret = async_do(taskfunc, &s[0], NULL, thecallback); //LOGF("you put v=%d when i=%d ret=%d
", v, i, sendret);
} // do recycle int rmmsgret = msgctl(msgqid, IPC_RMID, NULL); LOGF("msgqid %d have removed: ret=%d
"
, msgqid, rmmsgret); recycletaskpool(mainpool); LOGF("mainpool has been recycled
"
); } // log void logfmt(const char* str, const char* file, const int line, const char* func, ...) { time_t now = time(NULL); struct tm* tn = localtime(&now); static char prestr[256] = {}; // sprintf(prestr, "[%04d-%02d-%02d %02d:%02d:%02d] [%s:%d %s()] %s\x00", tn->tm_year+1900,tn->tm_mon+1,tn->tm_mday,tn->tm_hour,tn->tm_min,tn->tm_sec, file, line, func, str); va_list ap; va_start(ap, func); // printf vprintf vprintf(prestr, ap); va_end(ap); }

実行結果:
[root@iZ25gcs79rvZ thread]# ./thread 
[2017-07-13 16:54:44] [thread.c:164 main()] input which task run: i=0
1
[2017-07-13 16:54:46] [thread.c:164 main()] input which task run: i=1
[2017-07-13 16:54:46] [thread.c:57 taskconsumer()] threadtask(139760144594688) got a task: loopid=0 taskfunc=0x400bc9 param=49
[2017-07-13 16:54:46] [thread.c:78 task1()] task1 start, void*=0x7fff4f3fb0b0 v=49
2
[2017-07-13 16:54:46] [thread.c:164 main()] input which task run: i=2
[2017-07-13 16:54:46] [thread.c:57 taskconsumer()] threadtask(139760152987392) got a task: loopid=0 taskfunc=0x400c32 param=50
[2017-07-13 16:54:46] [thread.c:85 task2()] task2 start, void*=4f3fb0b0 v=50
2
[2017-07-13 16:54:47] [thread.c:164 main()] input which task run: i=3
[2017-07-13 16:54:47] [thread.c:57 taskconsumer()] threadtask(139760161380096) got a task: loopid=0 taskfunc=0x400c32 param=50
[2017-07-13 16:54:47] [thread.c:85 task2()] task2 start, void*=4f3fb0b0 v=50
2[2017-07-13 16:54:47] [thread.c:80 task1()] task1 over

[2017-07-13 16:54:47] [thread.c:164 main()] input which task run: i=4
[2017-07-13 16:54:47] [thread.c:57 taskconsumer()] threadtask(139760169772800) got a task: loopid=0 taskfunc=0x400c32 param=50
[2017-07-13 16:54:47] [thread.c:85 task2()] task2 start, void*=4f3fb0b0 v=50
2
[2017-07-13 16:54:48] [thread.c:164 main()] input which task run: i=5
[2017-07-13 16:54:48] [thread.c:57 taskconsumer()] threadtask(139760144594688) got a task: loopid=1 taskfunc=0x400c32 param=50
[2017-07-13 16:54:48] [thread.c:85 task2()] task2 start, void*=4f3fb0b0 v=50
2
[2017-07-13 16:54:48] [thread.c:164 main()] input which task run: i=6

[2017-07-13 16:54:56] [thread.c:87 task2()] task2 over
[2017-07-13 16:54:56] [thread.c:57 taskconsumer()] threadtask(139760152987392) got a task: loopid=1 taskfunc=0x400c32 param=50
[2017-07-13 16:54:56] [thread.c:85 task2()] task2 start, void*=4f3fb0b0 v=50
[2017-07-13 16:54:57] [thread.c:87 task2()] task2 over
[2017-07-13 16:54:57] [thread.c:87 task2()] task2 over
[2017-07-13 16:54:58] [thread.c:87 task2()] task2 over
q
[2017-07-13 16:55:04] [thread.c:196 main()] msgqid 917504 have removed: ret=0
[2017-07-13 16:55:04] [thread.c:198 main()] mainpool has been recycled