Watson IoT Platform - MQTT - CleanSessionの確認


初めてWatson IoT Platformを使ってみよう、というかたのためのガイドです。
Watson IoT Platformを使ってみるでIoTアプリを準備しました。
MessageSightを動かすでMQTTブローカーとしてMessageSightを準備しました。
Node-REDをローカル環境で稼働させる
Watson IoT Platform - MQTT - Node-REDからpub/subするでNode-REDからpub/subできるようにしました。

CleanSession=false(durable subscription)機能の確認

  • MQTTのCleanSession=false(durable subscription)機能により、サブスクライバー(受信者)は接続時にCleanSession=falseを指定することにより、自分が非接続中に配信されたメッセージについても再接続時に受信することができるようになります。
  • Node-REDのフローをコピーする方法を参照して以下のフローをコピペし、MQTTノードのアドレスを自環境に合わせます。
CleanSession=falseの確認
[{"id":"42b340e4.dc4bd","type":"mqtt-broker","z":"375326d0.fda38a","broker":"192.168.100.100","port":"1883","clientid":"","usetls":false,"verifyservercert":true,"compatmode":true,"keepalive":"60","cleansession":true,"willTopic":"","willQos":"0","willRetain":null,"willPayload":"","birthTopic":"","birthQos":"0","birthRetain":null,"birthPayload":""},{"id":"418d3097.c29d28","type":"mqtt out","z":"375326d0.fda38a","name":"","topic":"/abc/def/ghi","qos":"","retain":"","broker":"42b340e4.dc4bd","x":250,"y":520,"wires":[]},{"id":"fec77fc6.405f2","type":"inject","z":"375326d0.fda38a","name":"","topic":"","payload":"こんにちは","payloadType":"str","repeat":"","crontab":"","once":false,"x":100,"y":520,"wires":[["418d3097.c29d28"]]}]

  • 下記のプログラムを使ってサブスクライブします。
CleanSession=falseの確認
#!/usr/bin/python
# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt

topic = "/abc/def/ghi"
clientID = "subTester"
broker = "192.168.100.100"

def on_connect(client, userdata, flags, rc):
    print("connected")
    client.subscribe(topic)

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

mqttc = mqtt.Client(clientID,clean_session=0)

mqttc.on_connect = on_connect
mqttc.on_message = on_message

mqttc.connect(host=broker, port=1883, keepalive=60)

mqttc.loop_forever()

  • このプログラムを実行して、MQTTブローカーに対して接続しいったん/abc/def/ghiトピックへサブスクライブします。
  • Node-REDのフローからこの/abc/def/ghiトピックへメッセージを送信し、メッセージがこのプログラム(サブスクライバー)によって受信される事を確認します。
  • 次にこのプログラムをいったん終了します。
  • 再度Node-REDのフローを用いて(必要に応じてinjectノードから送信される文字列を「こんばんは」などへ更新するなどして)、/abc/def/ghiトピックへメッセージを送信します。

  • 上記のサブスクライバープログラムを再度実行し、再接続時に保存されていたメッセージが受信される事を確認します。