JavaでMQTT入門 ローカル環境でPubSubの動作を検証してみる


MQTTとは

MQTTとはM2M(Machine To Machine)の通信において、軽量な情報を頻繁にやりとりするために適したプロトコル。通信のやりとりはパブリッシュ/サブスクライブ型で行われ、「TOPIC」というメッセージを識別する仕組みによって、多対多でのメッセージのやりとりが可能となっている。

構成

MQTTを使った通信は、パブリッシャーブローカーサブスクライバーの3要素によって成り立っている。

  • パブリッシャー
    ブローカーに向けてメッセージを送信する側のプログラム。

  • ブローカー
    パブリッシャーから受け取ったメッセージを保管し、サブスクライバーへ受け渡す仲介役
    メッセージが正常に送信されたか、受信されたかのステータスを管理する役割も担う

  • サブスクライバー
    ブローカーからメッセージを受信する側のプログラム

簡単なイメージ図

トピック

トピックとはパブリッシャーがブローカーに対して送信するメッセージに付与する識別子のようなもので、サブスクライバーはトピックを指定することによってブローカーからメッセージを取得する。つまり、サブスクライバーから見た時には、メッセージの送信者を意識することなくメッセージのやりとりが行われる。複数のパブリッシャーが同じトピックでメッセージを送ることも可能であり、その場合はサブスクライバーは両パブリッシャーからのメッセージを受信することになる。このように、トピックという仕組みによってMQTTでは多対多の通信が行われる。

トピックの例

/customerA/message_version1/machineOne/operationData/
/customerA/messege_version1/machineOne/eventData/
/customerA/message_version1/machineTwo/operationData/
/customerA/message_version1/machineTwo/eventData/

上記のトピックをパブリッシャーが送信してきた場合、サブスクライバーは以下のようにトピックを指定してメッセージを受信できる。

  1. 完全一致
    /customerA/message_version1/machineOne/operationData/
  2. 前方一致
    /customerA/message_version1/*
    →機械に関係なく同一のmessage_versionのメッセージのみ取得する
  3. 部分一致
    /customerA/message_version1/+/operationData/
    →機械に関係なく同一のmessage_versionのoperationDataのみ受信する

QoS

MQTTは不安定な通信環境でのメッセージのやりとりを担保するための仕組みとして、QoS(Quality of Service)という仕組みが備わっている。QoSはパブリッシャーとブローカー間、ブローカーとサブスクライバー間でそれぞれ設定するものであるため、各通信で異なったQoSのレベルを設定することも可能である。

ここでは、MQTT Version 3.1.1の仕様に基づいて、QoSの仕組みを整理する。

QoSのレベル

  1. QoS 0
    メッセージは最高1回配信されるが、送信先に届くかどうかは保証されない
  2. QoS 1
    メッセージは少なくとも1回配信され、送信先への到達が保証されるが、重複して送信される可能性がある
  3. QoS 2
    メッセージは過不足なく1回のみ送信先への到達が保証される

QoSの担保方法

QoS 1


QoS1ではメッセージは少なくとも1回配信されることが保証される。例えば、パブリッシャーとブローカー間の通信において、パブリッシャーがQoS1を設定した場合、メッセージを受け取ったブローカーはPUBACKというパケットをヘッダーに付与し、パブリッシャーにレスポンスメッセージを送る。
パブリッシャーは一定時間内にPUBACKを受け取らなかった場合、メッセージが正常に配信されなかったと判断し同一のメッセージを再送する。

イメージ
1. パブリッシャー -----PUBLISH----> ブローカー
2. パブリッシャー <----PUBACK------ ブローカー

QoS 2


QoS2はMQTTで設定できる最も高いレベルでの通信保証の仕組みで、必ず1回のメッセージの配信が保証される。
パブリッシャーとブローカー間の通信において、パブリッシャーがQoS2を設定した場合、ブローカーはPUBLECというパケットをヘッダーに付与し、パブリッシャーにレスポンスメッセージを送る。同時にPUBLECに付与したパケットIDをブローカー内に保持する。パブリッシャー側は、ブローカーからPUBLECを受信するとPUBRELというパケットをヘッダーに付与し、ブローカーにレスポンスメッセージを送る。そして最後にブローカーはPUBRELを受け取った際に保持していたPUBRECのパケットIDも削除し、すべての通信が正常に完了したことをPUBCOMPというパケットをヘッダーに含めてレスポンスする。パブリッシャー側もPUBCOMPを受け取ったことにより、保持していたすべての情報を破棄し通信プロセスを完了する。

各メッセージが通信の途中で失われ、メッセージの送信者が一定時間内にレスポンスを受け取れなかった場合、そのメッセージの送信者が最後に送ったメッセージを再送する。

QoS1との違いは、通信が失われた場合の再送メッセージが、最初に送ったPUBLISHメッセージではなく、各プロセス段階で送られるPUBRECやPUBRELであること。この仕組みによって一つのPUBLISHメッセージが過不足なく1回のみ送信先へ配送されることを保証している。

イメージ
1. パブリッシャー -----PUBLISH----> ブローカー
2. パブリッシャー <----PUBREC------ ブローカー
3. パブリッシャー -----PUBREL-----> ブローカー
4. パブリッシャー <----PUBCOMP----- ブローカー

QoSの注意点

QoSはパブリッシャーとサブスクライバーの通信保証を行うものではない


QoSはあくまでパブリッシャーとブローカー、ブローカーとサブスクライバーとのメッセージ配信を保証するものである。そのため、パブリッシャーがQoS2を設定してメッセージを配信したとしても、それはサブスクライバーに過不足なく1回配信が保証されるものではない。サブスクライバーに過不足なく1回配信されるためには、パブリッシャーとサブスクライバーの両者がQoS2を指定する必要がある。

PacketIDは各クライアント毎にユニークであり、有限である。


QoS1のPUBACKや、QoS2のPUBREC、PUBREL、PUBCOMPにおいて、ヘッダーに付与されるパケットIDは通信を行っている各クライアントごとにユニークである。また、その空間はヘッダー部には8bitしかないため、最大で同時に65535のクライアントととしかメッセージ配信の保証はできないことに注意が必要。

ベストプラクティス

QoS0に向いている通信

  1. 通信環境が安定している
  2. 送信メッセージが失われても問題がない
  3. 送信メッセージサイズが小さく、通信間隔が短い

QoS1に向いている通信

  1. あらゆるメッセージを受け取る必要があるとき
  2. QoS2ほどのオーバーヘッドに耐えられないとき

QoS2に向いている通信

  1. 必須で1回のみの通信でなければならないという厳しい要件のとき

Retain機能

MQTTのメッセージやりとりはパブリッシュ/サブスクライブ型で行われ、各クライアントの通信をブローカーが仲介することとなるため、パブリッシュ側から見た時に確実にサブスクライブ側へメッセージが伝達されることは保証されない。
また、パブリッシュ側がブローカーと接続しているか(生きているか)否かも、同様の理由でサブスクライブ側からは知ることができない。
このような欠点を補う機能として、Retainがある。

Retainとは

Retainとは、パブリッシュからブローカーに対して送られた最後のメッセージを保存しておく機能で、パブリッシュ側がメッセージを送る際にフラグをセットすることで実現される。サブスクライバ側はブローカーに接続した際に、Retainされたメッセージを受け取ることができる。

Retainの範囲

  • Retainは一つのTopicに対してユニークとなるため、同一のTopicでメッセージが複数送られた場合は、最後のメッセージが保存される
  • サブスクライバ側はTopicに対して完全一致、部分一致、前方一致の全ての取得方法を指定可能
  • Retainされたメッセージはクライアント側から明示的な指示がない限りは残り続ける

Retainの活用方法

  1. 通信が不安定な状況での一斉配信
    サブスクライブ側のクライアントが常にブローカーと接続できない場合に、接続可能となった段階でメッセージを受け取る
  2. サブスクライバー側の死活管理
    パブリッシャ側が一定期間置きにメッセージを送る仕様であった場合に、サブスクライバ側がRetainされたメッセージの送信時刻と現在時刻を比較することで、パブリッシャ側の状態を推測することが可能

Last Will And Testament (LWT)

MQTTは不安定な通信環境を前提として作られたプロトコルなため、予め設定をしておくことで各クライアントとブローカー間の接続が異常に終了した場合に、そのTOPICを受信しているサブスクライバに対して異常終了したことを一斉に伝える機能として、LWTが準備されている。

異常な通信終了とは

  • サーバーがI/Oエラーやネットワーク異常を検知した場合
  • クライアントとブローカーで通信が発生している際に、指定期間ないの応答がなくなった場合
  • 明示的な通信終了を示すパケットを受け取らずに接続が終了した場合
  • プロトコルエラーでサーバーのネットワーク接続が終了した場合

MQTT と TLS

MQTTの通信は暗号化されていない。セキュアな環境で通信を行うための方法として、多くのMQTTブローカーはTLSによる通信手段を準備している。一般的には8883ポートがTLS用のデフォルトポートとなっている。
MQTTブローカーへの接続に対しては、ユーザー名とパスワードを使った認証も可能であるが、よりセキュアに通信を行いたい場合は、TLSを使ったほうがよい。

サンプルプログラム

ブローカー

ブローカーはQoSや機能によって差があるが、MosquittoやApolloなど様々なOSSのブローカーが存在する。ここのサイトに機能比較が載ってる。
今回は最も代表的なMosquittoを使ってMQTTを動かしてみる。

インストールは以下のサイトを参考のこと。
- ダウンロードサイト :Mosquitto.org
- Windows編 : Windows 7 64bitにMQTT(Mosquitto)を入れるメモ
- Linux編 : MosquittoをCentOSにインストールする手順

私はMacの環境で行ったので、以下のコマンドでインストール。

コマンド
brew install mosquitto
export PATH="$PATH:/usr/local/opt/mosquitto/sbin/"

パブリッシャー / サブスクライバー

実装にはPahoを使用。サンプルを実行可能な状態でGithubに置いているので、参考まで。
以下にはPublisherとSubscriberの箇所のみ抜き出して記載する。

必要な設定等は以下のようにyamlに定義してあるとします。

mqtt-config.yaml
broker: "tcp://localhost:1883"
publish:
  topic: "top/second/third"
  qos: 2
  clientId: "Publisher01"
subscribe:
  topic: "top/second/third"
  qos: 2
  clientId: "Subscriber01"

パブリッシャーはブローカーに対してメッセージを受け取る側なので、引数で受け取ったメッセージをブローカーに送る。ここで重要なのはtopicに指定する内容で、パブリッシャーとサブスクライバーで同じTopicを指定することでブローカーを通じたメッセージのやりとりが可能になる。上記で載せたように、Topicにはワイルドカードも指定可能。

MqttPublisher.java
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import commons.ConfigManager;

public class MqttPublisher {
    private final static Logger L = LoggerFactory.getLogger(MqttPublisher.class);

    public static void main(String[] args) {

        //Publish設定
        final String broker       = ConfigManager.getBrokerConfig();
        final String topic        = String.valueOf(ConfigManager.getPubSubConfig("publish", "topic"));
        final int qos             = (int) ConfigManager.getPubSubConfig("publish", "qos");
        final String clientId     = String.valueOf(ConfigManager.getPubSubConfig("publish", "clientId"));
        //Publishするメッセージ内容
        String content      = args[0];

        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            L.info("Connecting to broker: {}", broker);
            mqttClient.connect(connOpts);

            L.info("Connected and publishing message: qos -> {}, message -> {}", qos, content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            mqttClient.publish(topic, message);

            L.info("Message published and Disconneting broker");
            mqttClient.disconnect();
            L.info("Disconnected");

            System.exit(0);
        } catch(MqttException me) {
            L.error("reason: {} ", me.getReasonCode());
            L.error("message: {} ", me.getMessage());
            L.error("localize: {}", me.getLocalizedMessage());
            L.error("cause: {} ", me.getCause());
            L.error("exception: {}", me);
        }
    }

サブスクライバー側はブローカーからメッセージを受け取る側。パブリッシャーと同じTopicを指定することでメッセージを受け取りる。後々、QoSの確認で異常終了が行えるように標準入力を受け取るまでプログラムが起動し続けるようにしている。

MqttSubscriber.java
package mqtt.subscribe;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import commons.ConfigManager;

public class MqttSubscriber  implements MqttCallback {
    private final static Logger L = LoggerFactory.getLogger(MqttSubscriber.class);

    /**
     * MQTTブローカーとの接続を失った時に呼び出される.
     */
    @Override
    public void connectionLost(Throwable cause) {
        L.warn("Connection lost!");
        //再接続がしたかったらここに処理を書く
        System.exit(1);
    }

    /**
     * メッセージの送信が完了したときに呼ばれるCallback.
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //Subscribe側からは呼び出されない?
    }

    /**
     * メッセージを受信したときに呼ばれる。
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        L.info("Message arrived");
        L.info("Topic:", topic);
        L.info("Message: " + new String(message.getPayload()));
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber();
            subscriber.subscribe();
        } catch(MqttException me) {
            L.error("reason: {} ", me.getReasonCode());
            L.error("message: {} ", me.getMessage());
            L.error("localize: {}", me.getLocalizedMessage());
            L.error("cause: {} ", me.getCause());
            L.error("exception: {}", me);
        }
    }

    /**
     * メッセージを受信する.
     * 標準入力があるまで接続し続ける.
     * 
     * @throws MqttException
     * @throws InterruptedException 
     */
    public void subscribe() throws MqttException, InterruptedException {

        //Subscribe設定
        final String broker       = ConfigManager.getBrokerConfig();
        final String topic        = String.valueOf(ConfigManager.getPubSubConfig("subscribe", "topic"));
        final int qos             = (int) ConfigManager.getPubSubConfig("subscribe", "qos");
        final String clientId     = String.valueOf(ConfigManager.getPubSubConfig("subscribe", "clientId"));

        MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);

        L.info("Connecting to broker: {}", broker);
        client.connect(connOpts);

        L.info("Connected and subscribing message: qos -> {}, topic -> {}", qos, topic);
        client.subscribe(topic, qos);

        L.info("Please press any key if you would disconnect to broker.");
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        try{
            //標準入力を受け取るまで待ち続ける
            br.readLine();
        }catch(IOException e){
            System.exit(1);
        }
        client.disconnect();
        L.info("Disconnected");
    }
}

実行してみる

1. ブローカーの起動

ターミナルからmosquittoとコマンドを実行。コンソールに以下の内容が表示されたらok。

1463851949: mosquitto version 1.4.8 (build date 2016-02-14 11:22:37-0800) starting
1463851949: Using default config.
1463851949: Opening ipv4 listen socket on port 1883.
1463851949: Opening ipv6 listen socket on port 1883.

2. パブリッシャーからのメッセージ送信

Publisherのメインクラスを実行。Githubにあるサンプルプログラムはgradleを使っているので、以下のタスクを実行することで、メッセージを送信することが可能。

gradle -q publish

コンソールログに以下の内容が出力される。

[main] INFO mqtt.publish.MqttPublisher - Connecting to broker: tcp://localhost:1883
[main] INFO mqtt.publish.MqttPublisher - Connected and publishing message: qos -> 1, message -> Message from Mqtt Publish Sample
[main] INFO mqtt.publish.MqttPublisher - Message published and Disconneting broker
[main] INFO mqtt.publish.MqttPublisher - Disconnected

2行目から3行目にかけてメッセージが正常に送信されたことが出力されている。

その結果、ブローカーを実行したコンソール上には

463852292: New client connected from 127.0.0.1 as Publisher01 (c0, k60).
1463852292: Client Publisher01 disconnected.

と出力されており、ブローカー側から見てもパブリッシャーから接続があったことがわかる。

  1. サブスクライバーのメッセージ受信

最後にサブスクライバーのメインクラスを実行する。こちらもサンプルプログラムでは以下のタスクを実行する。

gradle -q subscribe

すると以下のログが出力される。

[main] INFO mqtt.subscribe.MqttSubscriber - Connecting to broker: tcp://localhost:1883
[main] INFO mqtt.subscribe.MqttSubscriber - Connected and subscribing message: qos -> 1, topic -> top/second/third
[MQTT Call: Subscriber01] INFO mqtt.subscribe.MqttSubscriber - Message arrived
[MQTT Call: Subscriber01] INFO mqtt.subscribe.MqttSubscriber - Topic:
[MQTT Call: Subscriber01] INFO mqtt.subscribe.MqttSubscriber - Message: Message from Mqtt Publish Sample
[main] INFO mqtt.subscribe.MqttSubscriber - Please press any key if you would disconnect to broker.

下から2行目にパブリッシャー側から送られたメッセージが出力されている。

サブスクライバーのコンソールで何かキーを打って、プログラムを終了させると、ブローカー側のコンソールログには以下のように出力される。

1463852522: New client connected from 127.0.0.1 as Subscriber01 (c0, k60).
1463852666: Client Subscriber01 disconnected.

参考にしたサイト