Kafka ConnectによるMQTTデータの登録
Kafka Connectに関する記事が少なく、今回かなり苦慮したのでメモ。
Kafka Connectというのは最近のKafkaに付属しているツールで、コードレスでKafkaにデータを流し込めるようにするもの。
Kafkaに流す際にはArvoというスキーマ、シリアライザを利用する。てっきりスキーマレスだと勘違いしていたので、この辺をFIXするのに大分苦戦しました。
Apache Kafkaのインストール
KafkaのインストールにはDockerが便利で、Zookeeperなどと一緒にデプロイすれば簡単にスケールもしてくれます。
ちなみに、私が調べた限りKafka業界で著名なのはConfluentとLANDOOPで、どちらもKafka周りのツールを作ったり、PaaSなどを提供していたりします。
今回はLANDOOP社がDocker-Hubにアップしているfast-data-devを使います。
個別にKafkaをインストールしていってもよいけれど、今回はConnectに加えてSchema Registryも使うので、非常に便利でした。
ちなみに、このパッケージにはLandoop Stream Reactor 25+ Connectorsと呼ばれるコネクタが最初からインストールされています。3rd Partyのコネクタを使うこともできますが、これをそのまま使ってみようと思いました。
Docke-Hubのサイトにはdockerコマンドの例が載っていますが、ここではdocker-composeを使います。
environment, volumesは環境に合わせて使ってください。
docker-compose up -d コマンドで立ち上げます。
version: '2'
services:
landoop:
image: "landoop/fast-data-dev"
network_mode: "host"
container_name: "landoop"
mem_limit: 3G
environment:
ADV_HOST: "EXPORTするIPアドレス"
USER: "Basic認証のユーザ名"
PASSWORD: "Basic認証のパスワード"
volumes:
- /path/to/3rd/Party/Connector/Jars:/connectors
ports:
- "2181:2181"
- "3030:3030"
- "8081-8083:8081-8083"
- "9581-9585:9581-9585"
- "9092:9092"
MQTTの準備
Kafka Connectを使うと、SUBSCRIBEして登録するデータのスキーマが違ったりすると落ちてしまうので、事前にしっかり成形するべきでしょう。
ここではペイロードはJSONにします。JSONにしておくと、Stream Reactor ConnectorsだけでKafkaに登録ができるからです。スキーマファイルを事前に用意して、コネクタ登録時に指定することもできますが、もしない場合は勝手に生成されます。Avroのスキーマファイルはこんな感じで、Schema Registryに登録されます。JSONなので見たらわかりますね。
ちなみに、MQTTだと沢山のTOPIC名があって、#や+といったワイルドカードを使って柔軟にSUBSCRIBEしますが、Kafkaに登録するときはTOPICは基本的に1つのIngest Topicとなります。ワイルドカードを書いても無視されます。
Stream Reactor ConnectorsによるMQTTの入力
前述の通り、LANDOOP社の純正ツールを使ってみます。一応マニュアルがありますが、注意しないとはまります。
普通にやろうとすると、http://localhost:3030とかにできるUIのCONNECTORのリンクから生成するのだろうけれど、パラメータに記述するKCQLというDSLのせいで入力ができません。ふざけてますね。
登録の仕方は、CURLを使うのが良いです。例えば以下の感じです。上記のdocker-compose.ymlを使えば、Schema Registryは8083ポートで上がっています。そこを指定します。MQTT Brokerはlocalhostに立っている想定です。
curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -H 'Accept: application/json' -d '{
"name":"mqtt-kafka",
"config":
{"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"tasks.max": "1",
"connect.mqtt.kcql": "INSERT INTO sensors SELECT * FROM /sensors WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
"connect.mqtt.service.quality": "0",
"connect.mqtt.password": "ブローカーのパスワード",
"connect.mqtt.username": "ユーザ名",
"connect.mqtt.hosts": "tcp://localhost:1883",
"connect.mqtt.converter.throw.on.error": "true"
}'
入力に使うJSONはnameとconfigが必要で、後者にネストされて各種パラメータを記載します。
connect.mqtt.kcqlというのがSQLライクなDSLになっています。この書き方だと、/sensorsというトピック名のペイロードをsensorsというKeyでKafkaに登録しようとします。その際、JsonSimpleConverterというパーサーを使って、MQTTのペイロードが処理されます。
ちなみにCONNECT CLIというのもあります。これを使ってもよさそう。
何か登録の際に問題が生じるなどした際は、http://localhost:3030/logs/connect-distributed.logにアクセスするとエラーログを見ることができます。
自作のコネクタの利用
fast-data-devの場合は、/connectorというパスの部分に自作のコネクタの設置をすればクラスパスに登録されるようです。
それに関してはこのサイトが参考になります。
Author And Source
この問題について(Kafka ConnectによるMQTTデータの登録), 我々は、より多くの情報をここで見つけました https://qiita.com/Takashi_Kasuya/items/0c6fbdba6c4631f2ea38著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .