[マイグレーション]zeroMQ学習

13140 ワード

声明:網易ブログが閉鎖されるため、ここに移動します.
これが何なのかはあまり言わないで、次はコードを勉強してバックアップに残します.
環境:vs 2008 win 7 zeromq-3.2.2(vs 2008を使用して直接コンパイルすればいいので、使用するときはヘッダファイル、lib、dllのいくつかを覚えておけばいい)/実行方法を記録し、ここでは主に(libzmq.dllをexe同ディレクトリに置くことを覚えておいてください)
  • 応答モードコンパイルされたexeプログラム入力をダブルクリック-srr#ここで応答を開始するサーバコンパイルされたexeプログラム入力をダブルクリック-crr#ここで応答を開始するクライアント
  • サブスクリプションモードコンパイルされたexeプログラム入力をダブルクリック-cps#ここでサブスクライバを起動し、1番目のダブルクリックコンパイルされたexeプログラム入力を起動-cps#ここでサブスクライバを起動2番目のダブルクリックコンパイルされたexeプログラム入力を起動-sps#ここでパブリッシャ
  • を起動
  • 配管ダブルクリックコンパイルされたexeプログラム入力–ts#スタートtask sinkダブルクリックコンパイルされたexeプログラム入力–tv#スタートtask ventilator(ここでは入力が要求されるので、2回待ってね)ダブルクリックコンパイルされたexeプログラム入力–tw#スタートtask worker(ここでは1番目)ダブルクリックコンパイルされたexeプログラム入力–tw#スタートtask worker(ここでは地2)など2回(task workerをsinkとventilatorに接続する)task ventilatorでcを入力して車に戻ります(ここで結果が見えますが、図を見たいなら自分でやります_、図がなくて真実がなくて、図をあげないで、自分で1回運行して、いくつか面白い問題があるかもしれません)
  • //
              ,          :(   1   )
    /
    // lpserver.h   
    #ifndef __LP_SERVER_H__
    #define __LP_SERVER_H__
    
    #include 
    #include 
    
    #include 
    #include 
    #include 
    #include 
    
    #include "zhelpers.h"
    
    
    
    //     ,       ,       ,
    int server_request_reply()
    {
    void *ctx = zmq_ctx_new();
    
    void *responder = zmq_socket(ctx, ZMQ_REP);
    int ret = zmq_bind(responder, "tcp://*:5555");
    assert(ret == 0);
    while (1)
    {
     zmq_msg_t request;
     zmq_msg_init(&request);
     zmq_msg_recv(&request, responder, 0);
    
     printf("Received Hello 
    "); zmq_msg_close(&request); Sleep(1); zmq_msg_t reply; zmq_msg_init_size(&reply, 5); memcpy(zmq_msg_data(&reply), "World", 5); zmq_msg_send(&reply, responder, 0); zmq_msg_close(&reply); } zmq_close(responder); zmq_ctx_destroy(ctx); return 0; } int client_request_reply() { void *ctx = zmq_ctx_new(); void *client = zmq_socket(ctx, ZMQ_REQ); int ret = zmq_connect(client,"tcp://localhost:5555"); assert(ret == 0); while(1) { zmq_msg_t data; zmq_msg_init_size(&data, 5); memcpy(zmq_msg_data(&data), "Hello", 5); zmq_msg_send(&data, client, 0); zmq_msg_close(&data); Sleep(1); zmq_msg_t request; zmq_msg_init(&request); zmq_msg_recv(&request, client, 0); printf("Received Hello
    "); zmq_msg_close(&request); } Sleep(2); zmq_close(client); zmq_ctx_destroy(ctx); return 0; } // // 1:n publish - subscribe int server_publish_subscribe() { void *ctx = zmq_ctx_new(); void *publisher = zmq_socket(ctx, ZMQ_PUB); int rc = zmq_bind(publisher, "tcp://127.0.0.1:5556"); s_error(rc); assert(rc == 0); //rc = zmq_bind(publisher, "ipc://weather.ipc"); //s_error(rc); //assert(rc == 0); int i = 0; int times = 0; srand((unsigned)time(NULL)); while (1) { int zipcode, temperature, relhumidity; zipcode = randof(10); temperature = randof(215) - 80; relhumidity = randof(50) + 10; // send message to all subscribers char update[20]; sprintf(update, "%d %d %d %d", zipcode, temperature, relhumidity, (i++)); s_send(publisher, update); Sleep(1); i = (i > 200000000) ? 0:i; // if (zipcode == 5) // { // times++; // printf("set 5 %d
    ", i); // if (times > 30) // break; // } } zmq_close(publisher); zmq_ctx_destroy(ctx); return 0; } int client_publish_subscribe() { void *ctx = zmq_ctx_new(); void *subscriber = zmq_socket(ctx, ZMQ_SUB); int rc = zmq_connect(subscriber, "tcp://localhost:5556"); assert(rc == 0); char *filter = "5"; // 1 5 rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter)); //rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0); assert(rc == 0); int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 10; update_nbr++) { char *string = s_recv(subscriber); int zipcode, temperature, relhumidity, i; sscanf(string, "%d %d %d %d", &zipcode, &temperature, &relhumidity, &i); printf("get %d te:%d ry:%d index:%d
    ", zipcode, temperature, relhumidity, i); total_temp += temperature; free(string); } printf(" Average temperature of zipcode '%s' was %dF
    ", filter, (int)(total_temp / update_nbr)); zmq_close(subscriber); zmq_ctx_destroy(ctx); return 0; } // int task_ventilator() { void *ctx = zmq_ctx_new(); void *sender = zmq_socket(ctx, ZMQ_PUSH); int rc = zmq_bind(sender, "tcp://*:5557"); s_error(rc); assert(rc == 0); void *sink = zmq_socket(ctx, ZMQ_PUSH); rc = zmq_connect(sink, "tcp://localhost:5558"); s_error(rc); assert(rc == 0); printf("Press enter when the workers are ready:"); while (1) { int rel = getchar(); if (rel == 'c') break; } printf(" Sending task to workers...
    "); s_send(sink, "0"); srand((unsigned)time(NULL)); int task_nbr; int total_msec = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; workload = randof(100) + 1; total_msec += workload; char string[10]; sprintf(string, "%d", workload); s_send(sender, string); } printf("Total expected cost: %d msec
    ", total_msec); Sleep(1); zmq_close(sink); zmq_close(sender); zmq_ctx_destroy(ctx); return 0; } int task_worker() { void *ctx = zmq_ctx_new(); void *receiver = zmq_socket(ctx, ZMQ_PULL); int rc = zmq_connect(receiver, "tcp://localhost:5557"); s_error(rc); assert(rc == 0); void *sender = zmq_socket(ctx, ZMQ_PUSH); rc = zmq_connect(sender, "tcp://localhost:5558"); s_error(rc); assert(rc == 0); while (1) { char *string = s_recv(receiver); fflush(stdout); printf("%s.", string); s_sleep(atoi(string)); free(string); s_send(sender, ""); } zmq_close(receiver); zmq_close(receiver); zmq_ctx_destroy(ctx); return 0; } int task_sink() { void *ctx = zmq_ctx_new(); void *receiver = zmq_socket(ctx, ZMQ_PULL); int rc = zmq_bind(receiver, "tcp://*:5558"); s_error(rc); assert(rc == 0); char *string = s_recv(receiver); free(string); int64_t start_time = s_clock(); int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv(receiver); free(string); if ((task_nbr / 10) * 10 == task_nbr) printf(":"); else printf("."); fflush(stdout); } printf("Total elapsed time: %d msec
    ", (int)(s_clock() - start_time)); zmq_close(receiver); zmq_ctx_destroy(ctx); return 0; } #endif // __LP_SERVER_H__ / // zhelpers.h /* ===================================================================== zhelpers.h Helper header file for example applications. ===================================================================== */ #ifndef __ZHELPERS_H_INCLUDED__ #define __ZHELPERS_H_INCLUDED__ // Include a bunch of headers that we will need in the examples #ifdef __cplusplus extern "C" { #endif #include //#include #include #include #include #include //#include #include //#include #include #include #include //#include #define random rand typedef INT64 int64_t; #define __WINDOWS__ // Version checking, and patch up missing constants to match 2.1 #if ZMQ_VERSION_MAJOR == 2 # error "Please upgrade to ZeroMQ/3.2 for these examples" #endif // Provide random number from 0..(num-1) #if (defined (__WINDOWS__)) # define randof(num) (int) ((float) (num) * rand () / (RAND_MAX + 1.0)) #else # define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) #endif // Receive 0MQ string from socket and convert into C string // Caller must free returned string. Returns NULL if the context // is being terminated. static char * s_recv (void *socket) { zmq_msg_t message; zmq_msg_init (&message); int size = zmq_msg_recv (&message, socket, 0); if (size == -1) return NULL; char *string = (char *)malloc (size + 1); memcpy (string, zmq_msg_data (&message), size); zmq_msg_close (&message); string [size] = 0; return (string); } // Convert C string to 0MQ string and send to socket static int s_send (void *socket, char *string) { zmq_msg_t message; zmq_msg_init_size (&message, strlen (string)); memcpy (zmq_msg_data (&message), string, strlen (string)); int size = zmq_msg_send (&message, socket, 0); zmq_msg_close (&message); return (size); } // Sends string as 0MQ string, as multipart non-terminal static int s_sendmore (void *socket, char *string) { zmq_msg_t message; zmq_msg_init_size (&message, strlen (string)); memcpy (zmq_msg_data (&message), string, strlen (string)); int size = zmq_msg_send (&message, socket, ZMQ_SNDMORE); zmq_msg_close (&message); return (size); } // Receives all message parts from socket, prints neatly // static void s_dump (void *socket) { puts ("----------------------------------------"); while (1) { // Process all parts of the message zmq_msg_t message; zmq_msg_init (&message); int size = zmq_msg_recv (&message, socket, 0); // Dump the message as text or binary char *data = (char *)zmq_msg_data (&message); int is_text = 1; int char_nbr; for (char_nbr = 0; char_nbr < size; char_nbr++) if ((unsigned char) data [char_nbr] < 32 || (unsigned char) data [char_nbr] > 127) is_text = 0; printf ("[%03d] ", size); for (char_nbr = 0; char_nbr < size; char_nbr++) { if (is_text) printf ("%c", data [char_nbr]); else printf ("%02X", (unsigned char) data [char_nbr]); } printf ("
    "); int64_t more; // Multipart detection more = 0; size_t more_size = sizeof (more); zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); zmq_msg_close (&message); if (!more) break; // Last message part } } // Set simple random printable identity on socket // static void s_set_id (void *socket) { char identity [10]; sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000)); zmq_setsockopt (socket, ZMQ_IDENTITY, identity, strlen (identity)); } // Sleep for a number of milliseconds static void s_sleep (int msecs) { #if (defined (__WINDOWS__)) Sleep (msecs); #else struct timespec t; t.tv_sec = msecs / 1000; t.tv_nsec = (msecs % 1000) * 1000000; nanosleep (&t, NULL); #endif } // Return current system clock as milliseconds static int64_t s_clock (void) { #if (defined (__WINDOWS__)) SYSTEMTIME st; GetSystemTime (&st); return (int64_t) st.wSecond * 1000 + st.wMilliseconds; #else struct timeval tv; gettimeofday (&tv, NULL); return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000); #endif } // Print formatted string to stdout, prefixed by date/time and // terminated with a newline. static void s_console (const char *format, ...) { time_t curtime = time (NULL); struct tm *loctime = localtime (&curtime); char *formatted = (char *)malloc (20); strftime (formatted, 20, "%y-%m-%d %H:%M:%S ", loctime); printf ("%s", formatted); free (formatted); va_list argptr; va_start (argptr, format); vprintf (format, argptr); va_end (argptr); printf ("
    "); } static void s_error(int rc) { if (rc != 0) { const char *data = zmq_strerror(zmq_errno()); printf("%s
    ", data); } } #ifdef __cplusplus } #endif #endif // __ZHELPERS_H_INCLUDED__ // // main.cpp #include #include #include #include "lpserver.h" char g_str[] = "[option]
    \ --srr server_request_reply()
    \ --crr client_request_reply()
    \ --sps server_publish_subscribe()
    \ --cps client_publish_subscribe()
    \ --ts task_sink()
    \ --tv task_ventilator()
    \ --tw task_worker()
    "; void command(char *cmd) { if (cmd == NULL) { printf("%s", g_str); return; } if (strcmp(cmd, "--srr") == 0) { server_request_reply(); } else if (strcmp(cmd, "--crr") == 0) { client_request_reply(); } else if (strcmp(cmd, "--sps") == 0) { server_publish_subscribe(); } else if (strcmp(cmd, "--cps") == 0) { client_publish_subscribe(); } else if (strcmp(cmd, "--ts") == 0) { task_sink(); } else if (strcmp(cmd, "--tv") == 0) { task_ventilator(); } else if (strcmp(cmd, "--tw") == 0) { task_worker(); } else { printf("%s", g_str); } } int main(int argc, char **argv) { if (argc >= 2) command(argv[1]); else { char data[10]; memset(data, 0, 10); scanf("%s", data); command(data); } system("pause"); return 0; }