Kafka ConnectによるMQTTデータの登録


Kafka Connectに関する記事が少なく、今回かなり苦慮したのでメモ。
Kafka Connectというのは最近のKafkaに付属しているツールで、コードレスでKafkaにデータを流し込めるようにするもの。
Kafkaに流す際にはArvoというスキーマ、シリアライザを利用する。てっきりスキーマレスだと勘違いしていたので、この辺をFIXするのに大分苦戦しました。

Apache Kafkaのインストール

KafkaのインストールにはDockerが便利で、Zookeeperなどと一緒にデプロイすれば簡単にスケールもしてくれます。
ちなみに、私が調べた限りKafka業界で著名なのはConfluentLANDOOPで、どちらもKafka周りのツールを作ったり、PaaSなどを提供していたりします。

今回はLANDOOP社がDocker-Hubにアップしているfast-data-devを使います。
個別にKafkaをインストールしていってもよいけれど、今回はConnectに加えてSchema Registryも使うので、非常に便利でした。
ちなみに、このパッケージにはLandoop Stream Reactor 25+ Connectorsと呼ばれるコネクタが最初からインストールされています。3rd Partyのコネクタを使うこともできますが、これをそのまま使ってみようと思いました。

Docke-Hubのサイトにはdockerコマンドの例が載っていますが、ここではdocker-composeを使います。
environment, volumesは環境に合わせて使ってください。
docker-compose up -d コマンドで立ち上げます。

docker-compose.yml
version: '2'

services:
  landoop:
    image: "landoop/fast-data-dev"
    network_mode: "host"
    container_name: "landoop"
    mem_limit: 3G
    environment:
      ADV_HOST: "EXPORTするIPアドレス"
      USER: "Basic認証のユーザ名"
      PASSWORD: "Basic認証のパスワード"
    volumes:
      - /path/to/3rd/Party/Connector/Jars:/connectors
    ports:
      - "2181:2181"
      - "3030:3030"
      - "8081-8083:8081-8083"
      - "9581-9585:9581-9585"
      - "9092:9092"

MQTTの準備

Kafka Connectを使うと、SUBSCRIBEして登録するデータのスキーマが違ったりすると落ちてしまうので、事前にしっかり成形するべきでしょう。
ここではペイロードはJSONにします。JSONにしておくと、Stream Reactor ConnectorsだけでKafkaに登録ができるからです。スキーマファイルを事前に用意して、コネクタ登録時に指定することもできますが、もしない場合は勝手に生成されます。Avroのスキーマファイルはこんな感じで、Schema Registryに登録されます。JSONなので見たらわかりますね。

ちなみに、MQTTだと沢山のTOPIC名があって、#や+といったワイルドカードを使って柔軟にSUBSCRIBEしますが、Kafkaに登録するときはTOPICは基本的に1つのIngest Topicとなります。ワイルドカードを書いても無視されます。

Stream Reactor ConnectorsによるMQTTの入力

前述の通り、LANDOOP社の純正ツールを使ってみます。一応マニュアルがありますが、注意しないとはまります。
普通にやろうとすると、http://localhost:3030とかにできるUIのCONNECTORのリンクから生成するのだろうけれど、パラメータに記述するKCQLというDSLのせいで入力ができません。ふざけてますね。

登録の仕方は、CURLを使うのが良いです。例えば以下の感じです。上記のdocker-compose.ymlを使えば、Schema Registryは8083ポートで上がっています。そこを指定します。MQTT Brokerはlocalhostに立っている想定です。

curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -H 'Accept: application/json'   -d '{
  "name":"mqtt-kafka",
  "config":
    {"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
    "tasks.max": "1",
    "connect.mqtt.kcql": "INSERT INTO sensors SELECT * FROM /sensors WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
    "connect.mqtt.service.quality": "0",
    "connect.mqtt.password": "ブローカーのパスワード",
    "connect.mqtt.username": "ユーザ名",
    "connect.mqtt.hosts": "tcp://localhost:1883",
    "connect.mqtt.converter.throw.on.error": "true"
  }'

入力に使うJSONはnameとconfigが必要で、後者にネストされて各種パラメータを記載します。
connect.mqtt.kcqlというのがSQLライクなDSLになっています。この書き方だと、/sensorsというトピック名のペイロードをsensorsというKeyでKafkaに登録しようとします。その際、JsonSimpleConverterというパーサーを使って、MQTTのペイロードが処理されます。

ちなみにCONNECT CLIというのもあります。これを使ってもよさそう。

何か登録の際に問題が生じるなどした際は、http://localhost:3030/logs/connect-distributed.logにアクセスするとエラーログを見ることができます。

自作のコネクタの利用

fast-data-devの場合は、/connectorというパスの部分に自作のコネクタの設置をすればクラスパスに登録されるようです。
それに関してはこのサイトが参考になります。