#3 「Confluent + InfluxDB + Grafana 」 で IoTデータをストリーミング処理後に可視化してみました


STEP-3 : InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータを Confluent でストリーミング処理し、InfluxDB+Grafanaでリアルタイム可視化できることを確認しました。該当する Sink Connector を使用しています。

以下の絵(の記事) の環境をベースに、

Consumer側の可視化の構成を追加する形で確認します。

以下の3つのステップで上記内容を順次説明します。今回は STEP-3 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

InfluxDB上でのGrafana実装クエリの事前確認

  1. ローカルターミナルから「influxdb」に接続し、データベース「IoTSample」アタッチします。
$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> precision rfc3339
> use IoTSample
Using database IoTSample
> 

2. 次にメジャーメント「topic_201」に Write されてたデータに対し、過去1分間に5秒おきの「vol_1」「vol_2」の平均値を確認します。

> select mean("vol_1"), mean("vol_2") from topic_201 where time > now() - 1m group by time(5s)
> 

3. 次にメジャーメント「topic_202」に Write されてたデータに対し、過去1分間に5秒おきのデータ件数の平均値を確認します。

> select count("SECTION") from topic_202 where time > now() - 1m group by time(5s)
> 

4. 別のローカルターミナルを立ち上げ、ここにある IoTデータ生成プログラムを実行します。1秒間隔で60件のデータを生成します。

$ python IoTSampleData-v5.py --mode mq --count 60 --wait 1

5. おおよそ1分経過後に再度「2.」のコマンドを実行してみます。5秒間隔で平均値を確認することができます。

> select mean("vol_1"), mean("vol_2") from topic_201 where time > now() - 1m group by time(5s)
name: topic_201
time                 mean               mean_1
----                 ----               ------
2021-07-19T05:14:25Z 129.79202807021207 71.001749876844
2021-07-19T05:14:30Z 141.79314837145438 77.21202004901218
2021-07-19T05:14:35Z 142.43826406142995 74.59740864094402
2021-07-19T05:14:40Z 137.8580044308178  81.50934862038898
2021-07-19T05:14:45Z 134.873868501621   61.43409456176089
2021-07-19T05:14:50Z 140.78094848620384 71.32979813250802
2021-07-19T05:14:55Z 145.02310937360227 72.36065514410427
2021-07-19T05:15:00Z 137.3310038804828  72.15488447751763
2021-07-19T05:15:05Z 160.3206197676804  62.23388796505937
2021-07-19T05:15:10Z 161.37451801550156 67.43620910989075
2021-07-19T05:15:15Z 143.62931215874102 79.41971050593733
2021-07-19T05:15:20Z 147.3842739112013  71.52211973488917
2021-07-19T05:15:25Z 120.39914758176593 86.95686303610904
> 

6. 続いて、再度「3.」のコマンドを実行してみます。5秒間隔でデータ件数を確認することができます。

> select count("SECTION") from topic_202 where time > now() - 1m group by time(5s)
name: topic_202
time                 count
----                 -----
2021-07-19T05:14:30Z 0
2021-07-19T05:14:35Z 0
2021-07-19T05:14:40Z 1
2021-07-19T05:14:45Z 0
2021-07-19T05:14:50Z 0
2021-07-19T05:14:55Z 0
2021-07-19T05:15:00Z 1
2021-07-19T05:15:05Z 0
2021-07-19T05:15:10Z 0
2021-07-19T05:15:15Z 0
2021-07-19T05:15:20Z 0
2021-07-19T05:15:25Z 0
2021-07-19T05:15:30Z 0
> 

この「2.」「3.」の2つのクエリをGrafanaのダッシュボード作成時に参考として使用します。

Grafana と influxdb の接続設定

  1. ブラウザから http://localhost:3000 で Grafana へアクセスします。
  2. Login画面が表示されるので、「username」と「Password」にデフォルトの「admin」を入力し、「Login」ボタンを押します。デフォルトパスワードの変更を求められますが、無視して画面下部の「Skip」を押します。

  3. 次に表示される「General / Home」画面で、「DATA SOURCE」を選択します。「Add data source」画面の「Time service databases」リストから「InfluxDB」の「Select」ボタンを押します。

  4. 次に表示される「Data Source / InfluxDB」画面で、以下の値を後、画面下部の「Save & test」ボタンを押し、正常値の「Data source is working」が表示されるのを確認します。

カテゴリ 入力項目
HTTP URL http://influxdb:8086
InfluxDB Detailes Database IoTSample

これで、Grafana から InfluxDB の「IoTSample」データベースへのアクセスを確認できました。

Grafana のダッシュボードの作成

  1. 「General / Home」画面で、「DASHBORDS」を選択します。メジャーメント「topic_201」のデータをグラフ表示するために「New Dashboard」-「Add Panel」画面から「Add an empty panel」を選択します。

  2. 「New Dashboard / Edit panel」画面が表示されるので、画面右の「Title」に「Vol_1 / Vol_2 平均値」と入力し、その後、画面中央のクエリ編集ボタン(鉛筆マーク)を押し、以下のクエリを入力します。

select mean("vol_1"), mean("vol_2") from topic_201 where time > now() - 5m group by time(5s)

3. データのグラフ表示に問題がなければ(該当期間のデータがなければ、IoTデータ生成プログラムを実行する必要あり)、画面右上の「Apply」のボタンを押します。

4. 次に、ジャーメント「topic_202」のデータをグラフ表示するために、右上の「Add Panel」を押し、新たに表示される「New Dashboard」-「Add Panel」画面から「Add an empty panel」を選択します。

5. 「New Dashboard / Edit panel」画面が表示されるので、画面右の「Title」に「Critical / Error / Warning 件数」と入力し、また、「Graph styles」-「style」で「Bars」を選択します。その後、画面中央のクエリ編集ボタン(鉛筆マーク)を押し、以下のクエリを入力します。

select count("SECTION") from topic_202 where time > now() - 5m group by time(5s)

6. データのグラフ表示に問題がなければ(該当期間のデータがなければ、IoTデータ生成プログラムを実行する必要あり)、画面右上の「Apply」のボタンを押します。

Grafana ダッシュボードでのリアルタイム可視化

  1. 表示されている「New Dashbord」画面の右側で、「Local browser time」に「Last 5 minutes」を、「Reflash dashbord」に「5s」を選択設定します。

  2. ローカルターミナルから IoTデータ生成プログラムを実行します。0.5秒間隔で600件のデータを生成します。

$ python IoTSampleData-v5.py --mode mq --count 600 --wait 0.5

3. IoTデータがリアルタイム表示されることを確認します(開始数秒後の画面と終了直後の画面)。

4. 設定内容を保存しておきます。画面左上のGrafanaアイコンをクリックします。保存確認の画面が表示され、「save dashbord」を選択します。次に表示される画面の「Dashbord name」に「IoTSample」と入力し「Save」ボタンを押します。

5. 最後にダッシュボードの設定内容をエクスポートしておきます。画面上部の星印の右隣にある変なマーク? をクリックします。新たに表示されるの画面の上部から「Export」を選択します。画面表示が切り替わるので「Save to file」ボタンを押します。

これで正常に、IoTデータ生成プログラム → RabbitMQ → SourceConnector → Broker → SinkConnector → InfluxDB → Grafanaでデータのリアルタイム可視化ができることを確認できました。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認