IoTとBME280で遊んでみた - 2
AWS IoTに送信したセンサー(BME280)のデータを、Kinesis Analyticsで分析してみました。
必要なもの
- AWSアカウント
- Raspberry Pi
- BME280 センサー
データの流れ
センサー(BME280)からAWS IoTに送信した温度・湿度・気圧データに対し、Kinesis Data Analyticsで異常検知のデータ(ANOMALY SCORE)を付加してS3に出力し、Athena + QuickSightでグラフ化してみました。
前回(AWS IoTとBME280で遊んでみた)は、BME280からAWS IoTへ温度・湿度・気圧データを送信するところまででしたが、それ以降のデータの流れの部分を作成します。
- BME280からAWS IoTへ温度・湿度・気圧データを送信
- AWS IoTルールにより受信した温度・湿度・気圧データをKinesis Data Firehoseへ送信
- Kinesis Data Firehoseで受けたデータをS3(バケット:iot-bme280)に出力すると同時に、Kinesis Data AnalyticsでANOMALY SCOREを付加し、別のKinesis Data Firehoseに送信
- Kinesis Data Firehoseで受けたANOMALY SCORE付データをS3(バケット:iot-bme280-2)に出力
- 上記S3出力は、JSON Lines形式から改行を省いた形式になっているので、Lambdaで各JSON毎に改行を入れてJSON Lines形式に変更し、S3(バケット:iot-bme280-3)へ出力
- S3(バケット:iot-bme280-3)に対し、Glueのクローラーでテーブルを定義してAthenaでクエリを実行
- QuickSightでグラフ化
0. サンプルプログラム(basicPubSub.py)の修正
前回の修正では現在日時は送信していませんでしたが、現在日時も送信するようにサンプルプログラム(basicPubSub.py)を修正してRaspberryPi上で実行します。
# Publish to the same topic in a loop forever
loopCount = 0
while True:
if args.mode == 'both' or args.mode == 'publish':
temperature, pressure, humidity = bme280.read_bme280()
message = {}
message['datetime'] = str(datetime.utcnow())
1. S3バケット2個、Kinesis Data Firehose2個の作成
Kinesis Data Firehose2個とそれぞれのFirehoseの出力用にS3バケット2個作成します。
Firehose名 | S3バケット名 | |
---|---|---|
1段目 | bme280 | iot-bme280 |
2段目 | bme280-2 | iot-bme280-2 |
2. AWS IoT ルールの作成
"MyTopic"というトピック名で受信したデータを全てKinesis Data Firehose(名前:bme280)に送信するルールを作成します。
※上記のように、Separatorは"\n (改行)"を選択します。
この時点でS3バケット(iot-bme280)にデータがたまりはじめます。
3. Kinesis Data Analyticsの設定
下記のようにFirehoseをsourceとdestinationに指定します。
source | destination |
---|---|
bme280 | bme280-2 |
SQLは、Example: Detecting Data Anomalies on a Streamを参考にRANDOM_CUT_FOREST関数を実装し、データにANOMALY SCOREを付加します。
SQL部分
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("datetime" TIMESTAMP, "temperature" REAL,"humidity" DOUBLE, "pressure" DOUBLE, ANOMALY_SCORE DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
"datetime","temperature","humidity","pressure",
ANOMALY_SCORE
FROM TABLE(RANDOM_CUT_FOREST(CURSOR(SELECT STREAM "datetime", cast("temperature" as DOUBLE) as"temperature","humidity", "pressure" FROM "SOURCE_SQL_STREAM_001")));
ここまでで、この時点でS3バケット(iot-bme280-2)にANOMALY SCOREが付加されたデータがたまりはじめます。
4. LambdaとS3バケットの作成
Firehoseは複数のJSONが1行にくっついた形式でS3に出力しますがこれではAthenaで読めないため、1行1JSONのJSON Lines形式に変更する必要があります。
その変換用のLambdaと変換後のデータを出力するS3バケット(iot-bme280-3)を作成し、S3バケット(iot-bme280-2)に対しオブジェクトが作成されたらLambdaを起動するようにイベントを設定します。
- Firehoseの出力形式 = 複数のJSONが1行にくっついている形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
- Athenaで読める形式
{"datetime":"2018-03-12 12:01:13.995","temperature":15.41,"humidity":72.4736328125,"pressure":1021.46828125,"ANOMALY_SCORE":1.2377089110080235}
{"datetime":"2018-03-12 12:01:15.265","temperature":15.41,"humidity":72.4736328125,"pressure":1021.5043359375001,"ANOMALY_SCORE":1.2386195886695075}
{"datetime":"2018-03-12 12:01:16.535","temperature":15.41,"humidity":72.4638671875,"pressure":1021.46828125,"ANOMALY_SCORE":1.2401199616651675}
- 変換用Lambda
import boto3
import urllib.parse
BUCKET_OUT = 'iot-bme280-3'
s3 = boto3.resource('s3')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
s3obj_in = s3.Object(bucket, key)
body_in = s3obj_in.get()['Body'].read().decode('utf-8')
body_out = body_in.replace('}{', '}\n{')
s3obj_out = s3.Object(BUCKET_OUT, key)
s3obj_out.put(Body=body_out.encode('utf-8'))
※ 変換部分はちょっと手抜きしてます。
この時点でS3バケット(iot-bme280-3)にAthenaで読めるデータがたまりはじめます。
5. Glue Data CatalogにTableを作成
GlueでS3バケット(iot-bme280-3)に対しクロールして、Data CatalogにTableを作成します。
AthenaでSQLを実行してデータが取得できることを確認します。
※変換用Lambdaで出力したS3のキーのフォーマットがHiveフォーマットでないため、クロールした時点で存在するパーティションのデータしか確認できません。再度クロールすれば、新しくできたパーティションのデータを確認できるようになります。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/partitions.html
- 現在のフォーマット s3://iot-bme280-3/2018/03/14/19/..
- Hiveフォーマット s3://iot-bme280-3/year=2018/month=03/day=14/hour=19/..
6. QuickSightでグラフを作成
QuickSightの画面で以下のところで
[New Analysis] -> [data set] -> [New Data Set] -> [Athena] -> [Create Data Set]
作成したTableを指定してグラフを作成します。
山になっているところは、BMS280に対しドライヤー攻撃をした結果になります。
温度の急上昇に合わせAnamaly Scoreも上昇していることが確認できます。
はまりポイント
1. RANDOM_CUT_FOREST関数がうまく動作しない
温度は 15.25 のように整数2桁小数2桁ですが多分DECIMALと判断されてしまったため、RANDOM_CUT_FOREST関数に入力しても入力データとしては無視されました。(BME280にドライヤー攻撃してもANOMALY_SCOREの値が全然変化しない)
そのため、明示的にDOUBLEに型変換してからRANDOM_CUT_FOREST関数に入力しています。
RANDOM_CUT_FOREST
-- DECIMAL is not a supported type. Use DOUBLE instead. --
2. QuickSightがエラーになる
QuickSightからAthena経由でアクセスする場合は、S3へのアクセス権限を設定する必要があります。
こちらの記事(Amazon QuickSightで「Insufficient permissions to execute the query」のエラーがでたときの対処法)がとても参考になりました。
雑感
-
BME280にドライヤー攻撃をしたところ1個BME280を壊してしまいました。センサーは意外にデリケートです。復活しました。
- FirehoseからS3へ出力する際に改行コード等のセパレータが指定できないのは不便。将来指定できるようになるといいなー。
- 次はSageMakerにデータを入れてみようかな。
Author And Source
この問題について(IoTとBME280で遊んでみた - 2), 我々は、より多くの情報をここで見つけました https://qiita.com/sirotosiko/items/e1be03d0335eb584e531著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .