[ZeroMQ] multipart message
データは長いかもしれません.例えば、大きなファイルなので、データを複数のメッセージに分けて送信することができますが、論理的にはこれらのメッセージが1つの処理に統合されるべきであることがわかります.
小さなプログラムを書いて、fclientはfserverにファイルをアップロードします.
FSeverエンド:
Fclientエンド:
小さなプログラムを書いて、fclientはfserverにファイルをアップロードします.
FSeverエンド:
#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;
#pragma comment(lib,"libzmq.lib")
void recv_file(void *s)
{
zmq_msg_t msg;
//Recv
int rc;
rc = zmq_msg_init(&msg);
assert(0 == rc);
rc = zmq_recv(s,&msg,0);
const char *re_string;
re_string = (const char*)zmq_msg_data(&msg);
FILE *fp = fopen(re_string,"wb+");
if(!fp)
{
cout<<" ."<<endl;
return;
}
zmq_msg_close(&msg);
//Send
const char* se_string = "ok";
rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
assert(0 == rc);
memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);
rc = zmq_send(s,&msg,0);
assert(0 == rc);
zmq_msg_close(&msg);
//Recv
INT64 more;
size_t more_size;
while(1)
{
rc = zmq_msg_init(&msg);
assert(0 == rc);
rc = zmq_recv(s,&msg,0);
assert(0 == rc);
// re_string = (const char*)zmq_msg_data(&msg);
// cout<<re_string<<endl;
fwrite((const char*)zmq_msg_data(&msg),zmq_msg_size(&msg),1,fp);
zmq_msg_close(&msg);
zmq_getsockopt(s,ZMQ_RCVMORE,&more,&more_size);
if(!more)
break;
}
//Send
rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
assert(0 == rc);
memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);
rc = zmq_send(s,&msg,0);
assert(0 == rc);
zmq_msg_close(&msg);
//Close file
fclose(fp);
puts("done.");
}
void main(int argc,TCHAR*argv[])
{
void *ctx;
ctx = zmq_init(1);
assert(ctx);
void *s;
s = zmq_socket(ctx,ZMQ_REP);
assert(s);
int rc;
rc = zmq_bind(s,"tcp://*:5555");
assert(0 == rc);
while(1)
{
recv_file(s);
}
}
Fclientエンド:
#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;
#pragma comment(lib,"libzmq.lib")
#define ONE_BUFFER_SIZE 1024
void myFree(void *data,void *hint)
{
cout<<"MyFree"<<endl;
free(data);
}
void main(int argc,char*argv[])
{
const char* se_string = "111.bmp";
FILE *fp = fopen(se_string,"rb");
if(!fp)
{
cout<<" ."<<endl;
return;
}
//Init ctx and socket
void *ctx;
ctx = zmq_init(1);
assert(ctx);
void *s;
s = zmq_socket(ctx,ZMQ_REQ);
assert(s);
int rc;
rc = zmq_connect(s,"tcp://127.0.0.1:5555");
assert(0 == rc);
//Send
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
assert(0 == rc);
memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);
rc = zmq_send(s,&msg,0);
assert(0 == rc);
zmq_msg_close(&msg);
//Recv
rc = zmq_msg_init(&msg);
assert(0 == rc);
rc = zmq_recv(s,&msg,0);
assert(0 == rc);
const char* re_string;
re_string = (const char*)zmq_msg_data(&msg);
cout<<re_string<<endl;
zmq_msg_close(&msg);
//Send
char *pBuf;
int n = 0;
while(1)
{
pBuf = (char*)malloc(ONE_BUFFER_SIZE);
n = fread(pBuf,1,ONE_BUFFER_SIZE,fp);
if(n < ONE_BUFFER_SIZE)
{
cout<<"client < 1024"<<endl;
// rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
// assert(0 == rc);
//
// memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);
rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL);
assert(0 == rc);
rc = zmq_send(s,&msg,0);
assert(0 == rc);
zmq_msg_close(&msg);
break;
}
// rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
// assert(0 == rc);
//
// memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);
rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL);
assert(0 == rc);
rc = zmq_send(s,&msg,ZMQ_SNDMORE);
assert(0 == rc);
zmq_msg_close(&msg);
}
//Recv
rc = zmq_msg_init(&msg);
assert(0 == rc);
rc = zmq_recv(s,&msg,0);
assert(0 == rc);
re_string = (const char*)zmq_msg_data(&msg);
cout<<re_string<<endl;
zmq_msg_close(&msg);
//Close ctx and socket
zmq_close(s);
zmq_term(ctx);
fclose(fp);
puts("done.client");
}