IoTとBME280で遊んでみた - 2


AWS IoTに送信したセンサー(BME280)のデータを、Kinesis Analyticsで分析してみました。

必要なもの

データの流れ

センサー(BME280)からAWS IoTに送信した温度・湿度・気圧データに対し、Kinesis Data Analyticsで異常検知のデータ(ANOMALY SCORE)を付加してS3に出力し、Athena + QuickSightでグラフ化してみました。

前回(AWS IoTとBME280で遊んでみた)は、BME280からAWS IoTへ温度・湿度・気圧データを送信するところまででしたが、それ以降のデータの流れの部分を作成します。

  1. BME280からAWS IoTへ温度・湿度・気圧データを送信
  2. AWS IoTルールにより受信した温度・湿度・気圧データをKinesis Data Firehoseへ送信
  3. Kinesis Data Firehoseで受けたデータをS3(バケット:iot-bme280)に出力すると同時に、Kinesis Data AnalyticsでANOMALY SCOREを付加し、別のKinesis Data Firehoseに送信
  4. Kinesis Data Firehoseで受けたANOMALY SCORE付データをS3(バケット:iot-bme280-2)に出力
  5. 上記S3出力は、JSON Lines形式から改行を省いた形式になっているので、Lambdaで各JSON毎に改行を入れてJSON Lines形式に変更し、S3(バケット:iot-bme280-3)へ出力
  6. S3(バケット:iot-bme280-3)に対し、Glueのクローラーでテーブルを定義してAthenaでクエリを実行
  7. QuickSightでグラフ化

0. サンプルプログラム(basicPubSub.py)の修正

前回の修正では現在日時は送信していませんでしたが、現在日時も送信するようにサンプルプログラム(basicPubSub.py)を修正してRaspberryPi上で実行します。

basicPubSub.py
# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if args.mode == 'both' or args.mode == 'publish':
        temperature, pressure, humidity = bme280.read_bme280()

        message = {}
        message['datetime']    = str(datetime.utcnow())

1. S3バケット2個、Kinesis Data Firehose2個の作成

Kinesis Data Firehose2個とそれぞれのFirehoseの出力用にS3バケット2個作成します。

Firehose名 S3バケット名
1段目 bme280 iot-bme280
2段目 bme280-2 iot-bme280-2

2. AWS IoT ルールの作成

"MyTopic"というトピック名で受信したデータを全てKinesis Data Firehose(名前:bme280)に送信するルールを作成します。

※上記のように、Separatorは"\n (改行)"を選択します。

この時点でS3バケット(iot-bme280)にデータがたまりはじめます。

3. Kinesis Data Analyticsの設定

下記のようにFirehoseをsourceとdestinationに指定します。

source destination
bme280 bme280-2

SQLは、Example: Detecting Data Anomalies on a Streamを参考にRANDOM_CUT_FOREST関数を実装し、データにANOMALY SCOREを付加します。

SQL部分

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("datetime" TIMESTAMP, "temperature" REAL,"humidity" DOUBLE, "pressure" DOUBLE, ANOMALY_SCORE DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM
        "datetime","temperature","humidity","pressure",
        ANOMALY_SCORE 
    FROM TABLE(RANDOM_CUT_FOREST(CURSOR(SELECT STREAM "datetime", cast("temperature" as DOUBLE) as"temperature","humidity", "pressure" FROM "SOURCE_SQL_STREAM_001")));

ここまでで、この時点でS3バケット(iot-bme280-2)にANOMALY SCOREが付加されたデータがたまりはじめます。

4. LambdaとS3バケットの作成

Firehoseは複数のJSONが1行にくっついた形式でS3に出力しますがこれではAthenaで読めないため、1行1JSONのJSON Lines形式に変更する必要があります。

その変換用のLambdaと変換後のデータを出力するS3バケット(iot-bme280-3)を作成し、S3バケット(iot-bme280-2)に対しオブジェクトが作成されたらLambdaを起動するようにイベントを設定します。

  • Firehoseの出力形式 = 複数のJSONが1行にくっついている形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
  • Athenaで読める形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}
{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}
{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
  • 変換用Lambda
import boto3
import urllib.parse

BUCKET_OUT = 'iot-bme280-3'

s3 = boto3.resource('s3')


def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    s3obj_in = s3.Object(bucket, key)
    body_in = s3obj_in.get()['Body'].read().decode('utf-8')
    body_out = body_in.replace('}{', '}\n{')

    s3obj_out = s3.Object(BUCKET_OUT, key)
    s3obj_out.put(Body=body_out.encode('utf-8'))

※ 変換部分はちょっと手抜きしてます。

この時点でS3バケット(iot-bme280-3)にAthenaで読めるデータがたまりはじめます。

5. Glue Data CatalogにTableを作成

GlueでS3バケット(iot-bme280-3)に対しクロールして、Data CatalogにTableを作成します。

AthenaでSQLを実行してデータが取得できることを確認します。

※変換用Lambdaで出力したS3のキーのフォーマットがHiveフォーマットでないため、クロールした時点で存在するパーティションのデータしか確認できません。再度クロールすれば、新しくできたパーティションのデータを確認できるようになります。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/partitions.html

  • 現在のフォーマット s3://iot-bme280-3/2018/03/14/19/..
  • Hiveフォーマット s3://iot-bme280-3/year=2018/month=03/day=14/hour=19/..

6. QuickSightでグラフを作成

QuickSightの画面で以下のところで
 [New Analysis] -> [data set] -> [New Data Set] -> [Athena] -> [Create Data Set]
作成したTableを指定してグラフを作成します。

山になっているところは、BMS280に対しドライヤー攻撃をした結果になります。
温度の急上昇に合わせAnamaly Scoreも上昇していることが確認できます。

はまりポイント

1. RANDOM_CUT_FOREST関数がうまく動作しない

温度は 15.25 のように整数2桁小数2桁ですが多分DECIMALと判断されてしまったため、RANDOM_CUT_FOREST関数に入力しても入力データとしては無視されました。(BME280にドライヤー攻撃してもANOMALY_SCOREの値が全然変化しない)
そのため、明示的にDOUBLEに型変換してからRANDOM_CUT_FOREST関数に入力しています。

RANDOM_CUT_FOREST
 -- DECIMAL is not a supported type. Use DOUBLE instead. --

2. QuickSightがエラーになる

QuickSightからAthena経由でアクセスする場合は、S3へのアクセス権限を設定する必要があります。
こちらの記事(Amazon QuickSightで「Insufficient permissions to execute the query」のエラーがでたときの対処法)がとても参考になりました。

雑感

  • BME280にドライヤー攻撃をしたところ1個BME280を壊してしまいました。センサーは意外にデリケートです。復活しました。
  • FirehoseからS3へ出力する際に改行コード等のセパレータが指定できないのは不便。将来指定できるようになるといいなー。
  • 次はSageMakerにデータを入れてみようかな。