【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その2)


前回 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その1)

の続きです。

前提

  • AWS IOT CORE 初級ハンズオンを参考にしています。
  • 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
  • AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
  • デバイスにはAWS IoT Device SDK をインストール
  • クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム( device_main.py)を利用
  • デバイスからIoT Coreに向けてMQTTで10秒おきにデータ送信 → 30秒おきに変更
  • データはこんな感じ
{
  "DEVICE_NAME": "jetson-nano-tsumida",
  "TIMESTAMP": "2021-03-17T22:03:48",
  "TEMPERATURE": 18,
  "HUMIDITY": 57
}

2-1.データのストア先と使ったルール(アクション)

※試してみたいものをチョイスしたリスト

# ストア先 使ったルール(アクション) 途中経由したサービス
1 S3 Amazon Kinesis Firehose ストリームにメッセージを送信する Amazon Kinesis Firehose
2 DynamoDB Amazon Kinesis ストリームにメッセージを送信する Amazon Kinesis Data Streams,Lambda
3 timestream Timestreamテーブルにメッセージを書き込む なし
4 Elasticsearch Amazon Elasticsearch Serviceにメッセージを送信する なし
5 AWS IoT Analytics IoT Analyticsにメッセージを送る IoT Analytics内でchannel,pipeline,storageを経由

今回はこの2 「Amazon Kinesis ストリームにメッセージを送信する」 でLamdaでDyanamoDBに保存するをやっていきます。

2-2-2. 「Amazon Kinesis ストリームにメッセージを送信する」 でLamdaでDyanamoDBに保存する

Dyanamo DBはアプリケーションが参照するためのNoSQL データストアです。
kinesis Data Streams に流すことで大量データのリアルタイム処理が効率的に行えます。

① Amazon Kinesis Data Streamsの作成

まずKinesis streams コンソールでストリームを作成します。

データストリームの作成を押して

データストリーム名に名前をつけます。(今回はjetson-nano-tsumidaと入れました。)
開いているシャード数は1にします。シャード数分コストはかかりますが処理性が上がります。

ステータスがアクティブになれば完了です。

② DynamoDB テーブルの作成

次にDyanamoDBのテーブルを作ります。

テーブルの作成を押します。

DyanamoDB テーブルの作成画面で
テーブル名に名称(今回はjetson-nano-tsumida-yyyymmddと入力)
パーティションキーに deviceid と入力、ソートキーの追加にチェックして timestamp を入力
これでテーブルを作ります。

これはすぐできると思います。

③ IAMロールの作成

Lambda用のロールと、AWS IoT 用のロールを作ります。

まずLambda用のロールです。
IAM のロールメニューで「ロールの作成」を押します。

一般的なユースケースのLambdaを選んで次の「ステップ:アクセス権限」を押します。

以下の3つにチェックを入れ「次のステップ:タグ」を押します。

  • AWSLambdaBasicExecutionRole
  • AmazonKinesisReadOnlyAccess
  • AmazonDynamoDBFullAccess

※今回はセキュリティ面で絞るべきところは考慮外とします。

タグは付けずに次に行きます。
ロール名に名前を付け(今回はjetson-nano-tsumida-lambda とつけました)
ポリシーにチェックした3つが入っているかを確認してロールの作成を押します。

次にAWS IoT 用のロールです
同様にIAMのロール画面でIoTを選び、「次のステップ:アクセス権限」を押します。

標準で必要な3つのポリシーがついてるのでそのまま次に行きます。

タグは付けずに次に行きます。
ロール名に名前を付け(今回はjetson-nano-tsumida-iot とつけました)ロールの作成を押します。

④ Lmabdaの設定

次はLambdaを設定していきます。

関数の作成を押す

一から作成を選ぶ
関数名に名前を付ける(今回は jetson-nano-tsumida-yyyymmdd とつけました。)
ランタイムはpython3.8を選択

デフォルトの実行ロールの変更 を押し、既存のロールを使用する にチェックをして
③ IAMロールの作成で作成したLambda用のロールを選び、関数の作成を押します。

次にトリガーを追加します。「+トリガーを追加」ボタンを押します。

トリガーの選択でKinesisを選びます。

Kinesis ストリームは、① Amazon Kinesis Data Streamsの作成 で作成したストリームを選びます。

次にコードをタブでコードを書きます。

lambda_function.pyをダブルクリックしてコードを書きます。

※今回のlambdaのコードは、AWS IoT ハンズオン用に用意されているここを参照してください。

Deployを押します。

次にこのpythonのコードが使う環境変数(Dynamoテーブルなど)を設定します。
上タブの設定、左メニューの環境変数を選び編集を押す。

キーに TABLE_NAME
値に②DynamoDB テーブルの作成で作ったテーブル名を入れ保存する

これで Kinesis → lambda → DynamoDBの部分の設定完了です。
次にIoT Coreのルールとアクションを設定します。

⑤ IoT Core ルール アクションの設定

前回と同様にルールとアクションを設定していきます。
左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。

ルールの作成画面が出てくるのでルールに名前を付け、クエリを書きます。

ルール名はjetson_nano_tsumida_kinesisとしました。クエリは以下です。

 select * from 'data/jetson-nano-tsumida'

次に「アクションの追加」をクリックして、
「Amazon Kinesis ストリームにメッセージを送信する」を選択して「アクションの設定」をクリックします。

アクションの設定画面が表示されるので
ストリーム名に ① Amazon Kinesis Data Streamsの作成 で作ったもの
パーティションキーに ${newuuid()} を入力
ロールは③ IAMロールの作成 で作ったAWS IOT 用のロールを選び「ロールの更新」を押します。

アタッチされたポリシーと表示されたらアクションの追加を押します。

元の画面に戻るので「ルールの作成」を押します。

⑥ データを確認する

AWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(device_main.py)でAWS IoT Coreに向けてデータを送り続けた状態にしています。※前回と変更して30秒おきに送っています。

  • Kinesis Data streams に流れたデータ確認

kinesis のコンソールで 作成したデータストリームのストリームメトリクスを見てみます。
レコードを取得 -合計(数)とput レコード - 合計(バイト)が増えていれば、データが流れている という事になります。


  • DynamoDB にたまったデータの確認

Dynamo のコンソールのテーブルで作成したテーブルをクリックし、「項目」タブを選ぶと送信されたデータが確認できます。
ちゃんと届入れいれば、1件ごとに1行表示されます。

データをクリックするとこんな感じで中が見れます。

DyanamoDBへの溜め方は以上です。

⑤ 今回のまとめ(IoT Core → ルール → kinesis Data Streams → Lambda → DyanamoDB)

IoT Core に届いたデータをKinesis Data streams を経由してDynamoDBへ入れてみました。
IoT CoreからDyanamoへは直接データを入れることもできますが、Kinesis Data Streams とLambdaを使うことで
DyanamoDBへストアする際に並列処理をしたりできるので、処理性能を上げることができます。

次回は
timestream とElastic Searchへ入れてみた を書きます。

追記:次回分リンク。長くなったのでElasticsearchはさらに次回に
【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その3)