Linux:MQTT通信プロトコルの6--pahoを作成する.mqtt.c簡単なCルーチン(非同期関数)

46720 ワード

ひどうきかんすう
非同期関数の利点は、非ブロックで実行することですが、同期関数の構造よりもやや複雑です.非同期方式は、まず対応する構造体を設定し、その後、対応する操作を開始し、これらの操作のバックグラウンドで実行が完了すると、相構造体の成功または失敗関数を呼び出し、これらの呼び出された関数の中で次の操作を実行することができます.
サブスクリプション・エンド
  プログラムは最初から私たちが定義したマクロに基づいてインスタンスを作成し、コールバック関数を設定し、次に「接続」の構造体を定義してユーザーパスワード、関数ポインタを埋め込み、この構造体はファイルハンドルをcontextとしてonConnectSuccessなどの関数として取得します.接続に成功した関数にはファイルハンドルがトピックを購読する必要があるからです.トピックを購読すると、サーバ側はトピックに基づいて一致するメッセージを送信し、メッセージが来ると上に設定したコールバック関数msgarrvdを呼び出し、その関数でメッセージの内容を取得して対応する操作を実行することができます.
#include 
#include 
#include 
#include 
#include "MQTTAsync.h"


#define ADDRESS     "tcp://127.0.0.1:2020"
#define CLIENTID    "Rookie_sub"
#define TOPIC       "test/+"
#define USERNAME	"user_test"
#define PASSWD		"123456"
#define QOS         2
#define KEEPALIVE	20

int g_iRunFlag = 1;


void onConnectSuccess(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);
void onSubscribeSuccess(void* context, MQTTAsync_successData* response);
void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
void onDisconnectSuccess(void* context, MQTTAsync_successData* response);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response);


int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	//   NQTT  
	if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
			!= MQTTASYNC_SUCCESS)
	{
		printf("Failed to create client, return %d
"
, rc); MQTTAsync_destroy(&client); return -1; } // , : 、 、 、 、 if ((rc = MQTTAsync_setCallbacks(client, &client, NULL, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callbacks, return %d
"
, rc); MQTTAsync_destroy(&client); return -1; } // conn_opts.context = client; // onConnectxxx conn_opts.username = USERNAME; conn_opts.password = PASSWD; conn_opts.keepAliveInterval = KEEPALIVE; conn_opts.cleansession = 1; conn_opts.automaticReconnect = 1; conn_opts.onSuccess = onConnectSuccess; conn_opts.onFailure = onConnectFailure; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return %d
"
, rc); MQTTAsync_destroy(&client); return -1; } while(g_iRunFlag) { // do something sleep(2); } // MQTT MQTTAsync_destroy(&client); return 0; } void onConnectSuccess(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions resp_opts = MQTTAsync_responseOptions_initializer; int rc; printf("Connect succeeded!
"
); resp_opts.onSuccess = onSubscribeSuccess; resp_opts.onFailure = onSubscribeFailure; if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &resp_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return %d
"
, rc); exit(-1); } } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("Connect failed, return %d
"
, response->code); exit(-1); } void onSubscribeSuccess(void* context, MQTTAsync_successData* response) { printf("Subscribe succeeded!
"
); } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { printf("Subscribe failed, return %d
"
, response->code); exit(-1); } int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc; printf("Recieve a message: (topic)%s, (msg)%s
"
, topicName, (char*)message->payload); if(0 == strcmp(message->payload, "quit")) { disc_opts.onSuccess = onDisconnectSuccess; disc_opts.onFailure = onDisconnectFailure; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return %d
"
, rc); exit(-1); } } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void onDisconnectSuccess(void* context, MQTTAsync_successData* response) { printf("Disconnect succeeded!
"
); g_iRunFlag = 0; } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("Disconnect failed, rc %d
"
, response->code); exit(-1); }

パブリッシング
パブリケーション側のプログラム構造もサブスクリプション側に似ており、非同期である以上、構造体充填関数ポインタを定義してから、対応する操作を起動してバックグラウンドで実行します.プログラムでは、MQTTインスタンス接続サーバを作成した後、メッセージを発行すると接続が切断されます.
#include 
#include 
#include 
#include 
#include "MQTTAsync.h"


#define ADDRESS     	"tcp://127.0.0.1:2020"
#define CLIENTID    	"Rookie_pub"
#define USERNAME		"user_test"
#define PASSWD			"123456"
#define TOPIC       	"test/common"
#define PAYLOAD     	"hello"
#define WILL_TOPIC		"test/will"
#define WILL_PAYLOAD	"pub_will_message"
#define QOS         	2
#define KEEPALIVE		20

int g_iRunFlag = 1;

void onConnectSuccess(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);
void onSendSuccess(void* context, MQTTAsync_successData* response);
void onSendFailure(void* context, MQTTAsync_failureData* response);
void onDisconnectSuccess(void* context, MQTTAsync_successData* response);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response);


int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
	
	int rc;

	//   NQTT  
	if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
			!= MQTTASYNC_SUCCESS)
	{
		printf("Failed to create client, return %d
"
, rc); MQTTAsync_destroy(&client); return -1; } // conn_opts.context = client; conn_opts.username = USERNAME; conn_opts.password = PASSWD; conn_opts.keepAliveInterval = KEEPALIVE; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnectSuccess; conn_opts.onFailure = onConnectFailure; conn_opts.will = &will_opts; will_opts.topicName = WILL_TOPIC; will_opts.payload.data = WILL_PAYLOAD; will_opts.payload.len = (int)strlen(WILL_PAYLOAD); if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return %d
"
, rc); MQTTAsync_destroy(&client); return -1; } while(g_iRunFlag) { // do something sleep(2); } MQTTAsync_destroy(&client); return 0; } void onConnectSuccess(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_message mesg_opts = MQTTAsync_message_initializer; MQTTAsync_responseOptions resp_opts = MQTTAsync_responseOptions_initializer; int rc; printf("Successful connection
"
); mesg_opts.payload = PAYLOAD; mesg_opts.payloadlen = (int)strlen(PAYLOAD); mesg_opts.qos = QOS; mesg_opts.retained = 0; resp_opts.context = client; resp_opts.onSuccess = onSendSuccess; resp_opts.onFailure = onSendFailure; if ((rc = MQTTAsync_sendMessage(client, TOPIC, &mesg_opts, &resp_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return %d
"
, rc); exit(-1); } } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("Connect failed, return %d
"
, response->code); exit(-1); } void onSendSuccess(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; int rc; printf("%d messages have been published
"
, response->token); sleep(2); disc_opts.onSuccess = onDisconnectSuccess; disc_opts.onFailure = onDisconnectFailure; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return %d
"
, rc); exit(-1); } } void onSendFailure(void* context, MQTTAsync_failureData* response) { printf(" Send message failed, return %d
"
, response->code); exit(-1); } void onDisconnectSuccess(void* context, MQTTAsync_successData* response) { printf("Disconnect succeeded
"
); g_iRunFlag = 0; } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("Disconnect failed, return %d
"
, response->code); exit(-1); }

コンパイル
同期関数のコンパイル方法と同様に、依存するダイナミックライブラリに差があり、非同期関数はlibpaho-mqtt 3 aを用いる.so.
gcc 002mysub.c -o 002mysub -I/work/system/paho.mqtt.c-master/src/ -lpaho-mqtt3a -L/work/system/paho.mqtt.c-master/build/output/
gcc 002mypub.c -o 002mypub -I/work/system/paho.mqtt.c-master/src/ -lpaho-mqtt3a -L/work/system/paho.mqtt.c-master/build/output/

実行環境の構成
同期関数の実行環境と同様に、ダイナミックライブラリを構成するパスが必要です.そうしないと実行できません.
export LD_LIBRARY_PATH=/work/system/paho.mqtt.c-master/build/output