SwitchBot を MQTT で操作する


先日「SwitchBot / Raspberry Pi / SORACOM Air for セルラーで「どこでもボタンを押してくれるロボット」
」を作りましたが、こうなると遠隔で操作したくなるのが人の SaGa 。なので今回は MQTT を使った操作をまとめました。

論よりコード

今回は AWS IoT Core を MQTT ブローカーとして使用していますが、Google IoT Core や PubNub など、MQTTブローカーだったらなんでも大丈夫でしょう。

仕様としては

  • Subscribe トピック: /devices/SwitchBot1/commands/#
  • Payload ドキュメント: {"state":{"command":"METHOD"}}
    • METHOD に対応した Controller のメソッドが実行される
    • 例) {"state":{"command":"press"}}Controller().press() が実行される

という実装です。

AWS IoT Core

AWS IoT Core の設定方法は割愛しますが、X.509 証明書による MQTTS(MQTT + TLS)による通信となるため、証明書の発行を AWS IoT Core 上で行います。

  1. 安全性 > ポリシー でポリシー作成 (一回のみ行う)
    1. 初めての場合は「アクション=iot:* リソースARN=*」としたポリシーで試してみてください(全権限という意味になるため、最終的には削除してください)
  2. 安全性 > 証明書 で以下の作業 (デバイス毎に行う)
    1. 証明書(証明書、パブリックキー、プライベートキー)の作成&ダウンロード
    2. 証明書の有効化
    3. ポリシーのアタッチ
  3. Amazon ルート CA をダウンロード (一回のみ行う)
    1. ※ 利用するのは Amazon ルート CA 1 が無難です

以上を行うことで、手元に以下のファイルが揃っていることを確認します。

  • 証明書 (XXXXXXXX-certificate.pem.crt)
  • プライベートキー (XXXXXXXX-private.pem.key)
  • Amazon ルート CA 1 (AmazonRootCA1.pem)

※ パブリックキー (XXXXXXXX-public.pem.key) は今回の手順においては使用しません。

必要なもの;

以下をすべて SwitchBot を制御するデバイス(e.g. Raspberry Pi) 内に集めます。すべて単一のディレクトリに保存しています。

  • switchbot.py (と、それを動かすのに必要なライブラリ等)
  • X.509 証明書ファイル群
    • 証明書 (XXXXXXXX-certificate.pem.crt)
    • プライベートキー (XXXXXXXX-private.pem.key)
    • Amazon ルート CA 1 (AmazonRootCA1.pem)

その他、情報として以下を入手しておきます。

作業

$ sudo apt install python3-paho-mqtt

以下 main.py では

  • ENDPOINT には AWS IoT Core のカスタムエンドポイント
  • SWITCHBOT_MACADDRESS: 操作対象の SwitchBot の MAC アドレス

に変更してください。

main.py
#!/usr/bin/env python3

import time
import paho.mqtt.client as mqtt # python3-paho-mqtt
import json
import switchbot

ENDPOINT = {"host": "YourEndpoint-ats.iot.REGION.amazonaws.com", "port": 8883}
THING_NAME = "SwitchBot1"

SWITCHBOT_MACADDRESS = "E2:94:FF:XX:XX:XX"

import ssl
CRT = "./XXXXXXXX-certificate.pem.crt`"
KEY = "./XXXXXXXX-private.pem.key"
rCA = "./AmazonRootCA1.pem"

class Controller:
    def press(self):
        switchbot.SwitchBot(SWITCHBOT_MACADDRESS).press()

class GeneralMqtt:
    def delta_topic(thing_name):
        return "/devices/{}/commands/#".format(thing_name) # Likes command topic of GCP IoT Core
    def extract(obj):
        return obj['state']['command'] # Likes shadow doc of AWS IoT Core

def on_connect(client, userdata, flag, rc):
    delta_topic = GeneralMqtt.delta_topic(THING_NAME)
    client.subscribe(delta_topic)

def on_message(client, userdata, msg):
    print(msg.payload, flush=True)
    obj = json.loads(msg.payload)
    method_name = GeneralMqtt.extract(obj)
    getattr(Controller(), method_name)()

def on_disconnect(client, userdata, rc):
    print("on_disconnect({})".format(rc))
    if rc != 0:
        quit(rc) # and expect respawn by Supervisor (e.g. systemd)

def on_log(client, obj, level, string):
    print(string, flush=True)

def main():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_message = on_message
    client.on_log = on_log
    client.tls_set(ca_certs=rCA, certfile=CRT, keyfile=KEY, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
    print("Connect to {}".format(ENDPOINT["host"]))
    client.connect(ENDPOINT["host"], port=ENDPOINT["port"], keepalive=600)
    client.loop_forever()

if __name__ == '__main__':
    main()

ここまでの作業で、ディレクトリ内が以下のようになります。

$ ls -w 1
91ffd4c88d-certificate.pem.crt
91ffd4c88d-private.pem.key
AmazonRootCA1.pem
main.py
switchbot.py

実行

main.py のディレクトリで main.py を起動すれば、以下のように /devices/SwitchBot1/commands/# というトピックで待ち受けが開始します。

Ctrl+C で停止です。

$ python3 main.py
Connect to YourEndpoint-ats.iot.REGION.amazonaws.com
Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k600) client_id=b''
Received CONNACK (0, 0)
Sending SUBSCRIBE (d0, m1) [(b'/devices/SwitchBot1/commands/#', 0)]
Received SUBACK
トラブルシュート
  • AWS IoT Core のカスタムエンドポイントの状態 (有効になっている必要があります)
  • AWS IoT Core のポリシー設定 (権限が不足している場合があります)
  • AWS IoT Core のリージョン (リージョンは一致させる必要があります)

テスト

AWS IoT Core のテストの画面から /devices/SwitchBot1/commands/a トピック宛に {"state":{"command":"press"}} を発行すると Raspberry Pi 側で以下のように表示されて SwitchBot が動きます。

Received PUBLISH (d0, q0, r0, m0), '/devices/SwitchBot1/commands/a', ...  (22 bytes)
b'{"state":{"command":"press"}}'

バックグラウンド動作化

systemd を使ってバックグラウンド動作できるようにします。
今回は pi という一般ユーザでもバックグラウンド動作できるようにします。

$ mkdir -p $HOME/.config/systemd/user
$HOME/.config/systemd/user/switchbot.service
[Unit]
Description=SwitchBot controller using MQTT
After=network-online.target

[Service]
Type=simple
WorkingDirectory=/home/pi/switchbot
ExecStart=/usr/bin/env python3 -B main.py
ExecStop=/bin/kill -INT ${MAINPID}
Restart=always

[Install]
WantedBy=default.target
$ systemctl --user enable switchbot.service
$ systemctl --user start switchbot.service
$ sudo loginctl enable-linger pi

あとがき

こうやって見ると、BLEデバイスを動かせて、コードを書き、systemd は知ってて、X.509証明書が使えて、MQTT 仕様を利用し、クラウド上のフルマネージドサービスを活用できる必要があるのだから、総力戦感がすごい。

EoT