#3 「Confluent + RabbitMQ」 で IoTデータを ストリーミング処理してみました


STEP-3 : ストリーミング処理後のデータ受信確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータをRabbitMQで受信し、該当する Source Connector を使用し、Confluent でストリーミング処理をできることを確認しました。

以下の3つのステップで上記内容を順次説明します。今回は STEP-3 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

ストリーミング処理後のデータ受信トピックの作成

  1. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
  2. 画面左側から「Topics」を選択し、新たに表示される「All topics」画面の右側にある「+ Add a topic」ボタンを押します。その後に表示される「New topic」画面の「Topic name」に「topic_202」を入力し、「Create with defaults」ボタンを押します。

Producer側の KsqlDB Stream の作成

  1. 続いて、画面左側から「ksqlDB」を選択し、新たに表示される「ksqlDB」画面から「ksqldb1」を選択し、その後に表示される「ksqldb1」画面の上部タグから「Streams」を選択します。切り替わった画面の左側から「Add Stream」ボタンを押します。

  2. 新たに表示される「Create a ksqlDB Stream」画面のリストから「topic_201」を選択します。その後に拡張表示される画面の「STREAM name」に「stream_201」を、「Value format」に「JSON」を選択します。それ以外の項目はデフォルト表示のまま、「Save STREAM」ボタンを押します。

Producer側 Stream でのデータ受信

  1. ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

2. 表示されている「ksqldb1」画面の上部タグから「Flow」を選択し、先程作成した「STREAM_201」を選択します。そうすると、画面右側の「STREAM_201」に、上記で生成したデータが表示されることが確認できます。I

Consumer側の KsqlDB Stream の作成

  1. KsqlDBのクエリー機能を使用してデータを抽出するには、Confluent Platform の Control-Center では作成できない(?)ので、「ksqldb-cli」経由で「ksqldb-server」を操作します。まずは、「ksqldb-cli」に接続します。
$ docker exec -it ksqldb-cli /bin/bash
[appuser@56b432e8a452 ~]$

2. 「ksqldb-server」に接続します。

[appuser@56b432e8a452 ~]$ ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.0.0, Server v6.0.0 located at http://ksqldb-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

3. ストリーミング処理用クエリを含んだストリームを作成します。 作成後の確認も行います。
- 「stream_201」のストリーミングデータを以下の条件で抽出し、その結果を「topic-202」に送信するストリーム「stream_202」を作成
- 抽出条件: section='E' OR section='C' OR section='W'

ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';

 Message                                 
-----------------------------------------
 Created query with ID CSAS_STREAM_202_0 
-----------------------------------------
ksql> 
ksql> describe extended stream_202;

Name                 : STREAM_202
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : topic_202 (partitions: 1, replication: 1)
Statement            : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
  S201.SECTION SECTION,
  S201.TIME ZZTIME,
  S201.PROC PROC,
  S201.IOT_NUM IOT_NUM,
  S201.IOT_STATE IOT_STATE,
  S201.VOL_1 VOL_1,
  S201.VOL_2 VOL_2
FROM STREAM_201 S201
WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W'))
EMIT CHANGES;

 Field     | Type            
-----------------------------
 SECTION   | VARCHAR(STRING) 
 ZZTIME    | VARCHAR(STRING) 
 PROC      | VARCHAR(STRING) 
 IOT_NUM   | VARCHAR(STRING) 
 IOT_STATE | VARCHAR(STRING) 
 VOL_1     | DOUBLE          
 VOL_2     | DOUBLE          
-----------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM_202_0 (RUNNING) : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT   S201.SECTION SECTION,   S201.TIME ZZTIME,   S201.PROC PROC,   S201.IOT_NUM IOT_NUM,   S201.IOT_STATE IOT_STATE,   S201.VOL_1 VOL_1,   S201.VOL_2 VOL_2 FROM STREAM_201 S201 WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic topic_202)
ksql> 

4. 作成した2つのストリームを確認します。

ksql> show streams;

 Stream Name         | Kafka Topic                 | Format 
------------------------------------------------------------
 KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON   
 STREAM_201          | topic_201                   | JSON   
 STREAM_202          | topic_202                   | JSON   
------------------------------------------------------------
ksql> 

5. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスし、「ksqldb」の「Flow」を確認します。上記で作成したフローが表示されています。

Consmer側 Stream でのデータ受信

  1. ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。抽出条件を定義しているので、50件のデータを生成します。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1

2. 表示されている画面の「STREAM_202」を選択します。そうすると、画面右側の「STREAM_202」に、抽出されたデータのみが表示されることが確認できます。I

3. 「topic_202」でも、抽出されたデータのみが表示されることが確認できます。I

これで、「topic_201」のデータをストリーミング処理(クエリ処理)し、データ抽出結果を「topic_202」で確認できました。

最後に

3つのステップを経て、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信し、ストリーミング処理でデータを抽出できることを確認できました。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認