#2 「Confluent + InfluxDB + Grafana 」 で IoTデータをストリーミング処理後に可視化してみました


STEP-2 : InfluxDBでのBrokerからデータ受信確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータを Confluent でストリーミング処理し、InfluxDB+Grafanaでリアルタイム可視化できることを確認しました。該当する Sink Connector を使用しています。

以下の絵(の記事) の環境をベースに、

Consumer側の可視化の構成を追加する形で確認します。

以下の3つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

influxdb の設定

  1. STEP-1 で作成したローカルDocker環境に対して、ローカルターミナルから「influxdb」に接続し情報を確認します。
$ docker exec -it influxdb /bin/bash

root@influxdb:/# 
root@influxdb:/# uname -srn
Linux influxdb 5.10.25-linuxkit
root@influxdb:/# 
root@influxdb:/# influxd version
InfluxDB v1.8.6 (git: 1.8 v1.8.6)
root@influxdb:/# 
root@influxdb:/# influx -version
InfluxDB shell version: 1.8.6
root@influxdb:/# 
root@influxdb:/# exit
exit

2. 「influxdb」にデータベース「IoTSample」を作成します。

$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> create database IoTSample
> 
> show databases
name: databases
name
----
_internal
IoTSample
> 

コネクタの作成

  1. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
  2. 続いて、画面左側から「Connect」を選択し、新たに表示される「All Connect Clusters」画面から「connect-default」を選択し、その後に表示される「Connectors」画面右側から「+ Add connector」ボタンを押します。

  3. 新たに表示される「Browse」画面から「InfluxDBSinkConnector」を選択します。

  4. その後に表示される「Add Connector」画面の「topics」項目で「topic_201」と「topic_202」を選択し、続く「name」項目に「InfluxDBSinkConnector_1」というコネクタ名を入力します。

  5. コネクタ名を入力すると、入力項目が増えますので、以下の内容を入力し、最後に「Continu」ボタンを押します。

カテゴリ 入力項目
Which topics do you want to get data from? topics topic_201, topic_202
How should we connect to your data? name InfluxDBSinkConnector_1
Common tasks.max 1
Common Value converter class org.apache.kafka.connect.json.JsonConverter
InfluxDB InfluxDB API URL* http://influxdb:8086
Write InfluxDB Database IoTSample
Write Measurement Name Format ${topic}
Additional Properties value.converter.schemas.enable false

6. 設定内容に問題なければ、「Launch」ボタンを押します。

7. Connecotrs画面で作成したConnector「InfluxDBSinkConnector_1」が正常に起動していることを確認します。

InfluxDBでの確認

  1. ローカルターミナルから「influxdb」に接続し、データベース「IoTSample」にメジャーメント(テーブル)「topic_201」と「topic_202」が自動作成されていることを確認します。
$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> use IoTSample
Using database IoTSample
> 
> show measurements
name: measurements
name
----
topic_201
topic_202
> 

2. ローカルコンピュータ上で、ここにある IoTデータ生成プログラムを実行します。50件のデータを生成します。

$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1

3. 次に各メジャーメント(テーブル)にデータが Write されているか確認します。

> 
> select * from topic_201
name: topic_201
time                id iot_num  iot_state proc section vol_1              vol_2
----                -- -------  --------- ---- ------- -----              -----
1626241925549000000 0  226-2461 島根県       111  M       198.1845397767455  57.268514136837176
1626241926554000000 1  145-2621 宮崎県       111  A       121.70584421224085 75.71752774135611
1626241927559000000 2  852-7187 熊本県       111  A       126.41080104674175 72.15478589353621
    :
    :
    :
1626408886875000000 47 440-1826 兵庫県       111  J       116.80492255443481 58.16002504591993
1626408887881000000 48 238-2420 長野県       111  Q       168.35017898970486 68.57868248992546
1626408888886000000 49 175-8107 高知県       111  S       132.90782936265464 86.22280607722584
> 
> 
> select * from topic_202
name: topic_202
time                IOT_NUM  IOT_STATE PROC SECTION VOL_1              VOL_2              ZZTIME
----                -------  --------- ---- ------- -----              -----              ------
1626408840741000000 327-9411 埼玉県       111  C       127.4471407715012  71.60478903611397  2021-07-16T13:13:59.693142
1626408848776000000 910-2198 兵庫県       111  E       194.46476675784402 70.33439224608537  2021-07-16T13:13:59.693268
1626408863809000000 134-0246 北海道       111  E       123.3766530207224  61.78121028564689  2021-07-16T13:13:59.693468
1626408876870000000 214-7209 埼玉県       111  C       101.40696510178657 88.97291678787539  2021-07-16T13:13:59.693639
1626408878878000000 103-2715 福島県       111  C       160.6706712489061  53.167263544338155 2021-07-16T13:13:59.693665
>

これで正常にBrokerのトピック経由でInfluxDBでデータ受信できることを確認できました。
次のステップでは、このメジャーメント(テーブル)のデータを Grafana でリアルタイム可視化できることを確認してみます。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認