Linux:MQTT通信プロトコルの4--mosquittoを記述するCルーチン(非同期関数)

43545 ワード

前の記事では、非同期関数と同期関数の比較について簡単に言及したが、非同期関数は非ブロックであるため、同期関数よりも性能がやや優れているため、MQTTの通信を実現するために非同期関数もよく使われている.非同期関数と同期関数の違いは,接続サーバにおけるconnect関数,loopループ関数である.では、loop関数の呼び出し方法を簡単に探ってみましょう.同期関数はmosquittoを呼び出すことです.loop関数は、実装を待つ通信をブロックする.ソースコードを見ると、非同期方式の「loop」関数は、同期方式でブロック待ちを招くmosquitto_を完了するためのスレッドを作成することであることがわかります.loop関数は、次のように呼び出されます.
mosquitto_loop_start(mosq);		//      loop
	pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)
		mosquitto_loop_forever(mosq, 1000*86400, 1);
			mosquitto_loop(mosq, timeout, max_packets);		//      loop

mosquitto_loop_stop(mosq, false);
	pthread_cancel(mosq->thread_id);
	pthread_join(mosq->thread_id, NULL);

同期非同期関数の呼び出しの違いを理解したら、非同期接続サーバ関数mosquitto_を見てみましょう.connect_asyncの公式説明:
Connect to an MQTT broker. This is a non-blocking call. If you use mosquitto_connect_async your client must use the threaded interface mosquitto_loop_start. If you need to use mosquitto_loop, you must use mosquitto_connect to connect the client. May be called before or after mosquitto_loop_start.
次に、connect関数とloop関数のマッチング関係に基づいて、非同期通信プログラムを記述できます.
サブスクリプション・エンド
#include 
#include 
#include 
#include 
#include "mosquitto.h"


/*        */
#define DEBUG_PROCESS   printf
#define DEBUG_ERROR     printf
#define DEBUG_MSG       printf


//               
static int g_iRunFlag = 1;


/*                 ,mosquitto_xx          */
void on_connect(struct mosquitto *mosq, void *obj, int rc);
void on_disconnect(struct mosquitto *mosq, void *obj, int rc);
void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos);
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg);


int main()
{
        int ret;
        struct mosquitto *mosq;

        //    mosquitto 
        ret = mosquitto_lib_init();
        if(ret){
                DEBUG_ERROR("Init lib error!
"
); return -1; } // // :id( NULL)、clean_start、 mosq = mosquitto_new("Rookie_sub", true, NULL); if(mosq == NULL){ DEBUG_ERROR("New error!
"
); mosquitto_lib_cleanup(); return -1; } // :, // : 、 、 ret = mosquitto_username_pw_set(mosq, "user_test", "123456"); if(ret){ DEBUG_ERROR("Set username and password error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } // // : 、 mosquitto_connect_callback_set(mosq, on_connect); mosquitto_disconnect_callback_set(mosq, on_disconnect); mosquitto_subscribe_callback_set(mosq, on_subscribe); mosquitto_message_callback_set(mosq, on_message); // // : 、ip(host)、 、 // ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60); ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60); if(ret){ DEBUG_ERROR("Connect server error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } ret = mosquitto_loop_start(mosq); if(ret){ DEBUG_ERROR("Start loop error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } // : 、 g_iRunFlag DEBUG_PROCESS("Start!
"
); while(g_iRunFlag) { //mosquitto_loop(mosq, -1, 1); sleep(1); } // mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); DEBUG_PROCESS("End!
"
); return 0; } void on_connect(struct mosquitto *mosq, void *obj, int rc) { DEBUG_PROCESS("Call the function: on_connect
"
); if(rc){ // , DEBUG_ERROR("on_connect error!
"
); exit(1); }else{ // // : 、id、 、qos if(mosquitto_subscribe(mosq, NULL, "test/+", 2)){ DEBUG_ERROR("Set the topic error!
"
); exit(1); } } } void on_disconnect(struct mosquitto *mosq, void *obj, int rc) { DEBUG_PROCESS("Call the function: on_disconnect
"
); g_iRunFlag = 0; } void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { DEBUG_PROCESS("Call the function: on_subscribe
"
); } void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) { DEBUG_PROCESS("Call the function: on_message
"
); DEBUG_MSG("Recieve a message: %s
"
, (char *)msg->payload); if(0 == strcmp(msg->payload, "quit")){ mosquitto_disconnect(mosq); } }

パブリッシング
#include 
#include 
#include 
#include 
#include "mosquitto.h"

#define DEBUG_PROCESS   printf
#define DEBUG_ERROR     printf
#define DEBUG_MSG       printf

static int g_iRunFlag = 1;

void on_connect(struct mosquitto *mosq, void *obj, int rc);
void on_disconnect(struct mosquitto *mosq, void *obj, int rc);
void on_publish(struct mosquitto *mosq, void *obj, int mid);


int main()
{
        int ret;
        struct mosquitto *mosq;

        ret = mosquitto_lib_init();
        if(ret){
                DEBUG_ERROR("Init lib error!
"
); return -1; } mosq = mosquitto_new("Rookie_pub", true, NULL); if(mosq == NULL){ DEBUG_ERROR("New error!
"
); mosquitto_lib_cleanup(); return -1; } // MQTT // : 、 、 、 、qos、 ret = mosquitto_will_set(mosq, "test/will", strlen("pub_will_message"), "pub_will_message", 2, false); if(ret){ DEBUG_ERROR("Set will message error!
"
); return -1; } ret = mosquitto_username_pw_set(mosq, "user_test", "123456"); if(ret){ DEBUG_ERROR("Set username and password error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } mosquitto_connect_callback_set(mosq, on_connect); mosquitto_disconnect_callback_set(mosq, on_disconnect); mosquitto_publish_callback_set(mosq, on_publish); ret = mosquitto_loop_start(mosq); if(ret){ DEBUG_ERROR("Start loop error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } // ret = mosquitto_connect(mosq, "127.0.0.1", 2020, 60); ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60); if(ret){ DEBUG_ERROR("Connect server error!
"
); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return -1; } DEBUG_PROCESS("Start!
"
); while(g_iRunFlag) { //mosquitto_loop(mosq, -1, 1); sleep(1); } mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); DEBUG_PROCESS("End!
"
); return 0; } void on_connect(struct mosquitto *mosq, void *obj, int rc) { DEBUG_PROCESS("Call the function: on_connect
"
); if(rc){ DEBUG_ERROR("on_connect error!
"
); exit(1); }else{ // : 、id、 、 、 、Qos、 if(mosquitto_publish(mosq, NULL, "test/common", strlen("hello"), "hello", 2, false)){ DEBUG_ERROR("Set the topic error!
"
); exit(1); } } } void on_disconnect(struct mosquitto *mosq, void *obj, int rc) { DEBUG_PROCESS("Call the function: on_disconnect
"
); g_iRunFlag = 0; } void on_publish(struct mosquitto *mosq, void *obj, int mid) { DEBUG_PROCESS("Call the function: on_publish
"
); sleep(2); mosquitto_disconnect(mosq); }