【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その3)


前回 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その2)

の続きです。

第1回目はこちら 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その1)

前提

  • AWS IOT CORE 初級ハンズオンを参考にしています。
  • 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
  • AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
  • デバイスにはAWS IoT Device SDK をインストール
  • クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム( device_main.py)を利用
  • デバイスからIoT Coreに向けてMQTTで30秒おきにデータ送信
  • データはこんな感じ
{
  "DEVICE_NAME": "jetson-nano-tsumida",
  "TIMESTAMP": "2021-03-17T22:03:48",
  "TEMPERATURE": 18,
  "HUMIDITY": 57
}

2-1.データのストア先と使ったルール(アクション)

※試してみたいものをチョイスしたリスト

# ストア先 使ったルール(アクション) 途中経由したサービス
1 S3 Amazon Kinesis Firehose ストリームにメッセージを送信する Amazon Kinesis Firehose
2 DynamoDB Amazon Kinesis ストリームにメッセージを送信する Amazon Kinesis Data Streams,Lambda
3 timestream Timestreamテーブルにメッセージを書き込む なし
4 Elasticsearch Amazon Elasticsearch Serviceにメッセージを送信する なし
5 AWS IoT Analytics IoT Analyticsにメッセージを送る IoT Analytics内でchannel,pipeline,storageを経由

今回はこの3 「Timestreamテーブルにメッセージを書き込む」をやっていきます。

2-2-3. 「Timestreamテーブルにメッセージを書き込む」

Amazon Timestreamは時系列データを保存する事に特化したマネージドサービスです。

Amazon Timestream公式URL

公式から引用

Amazon Timestream は、IoT および運用アプリケーションに適した、高速かつスケーラブルなサーバーレス時系列データベースサービスです。リレーショナルデータベースの最大 1,000 倍の速度と 10 分の 1 のコストで、1 日あたり数兆ものイベントを、簡単に保存し、分析できます。

2020年9月30日GAになっています。

米国東部 (バージニア北部)、米国東部 (オハイオ)、米国西部 (オレゴン)、および欧州 (アイルランド) で利用可能

現在公開リージョンは上記です。

東京リージョンでアクセスしてみましたが、2021/06/29時点でまだでした。

TimestreamはAWS IOT CORE 初級ハンズオン では対象になっていませんが、ちょっと試してみたいのでやってみました。リージョンは米国東部 (オハイオ) で試しています。

① Amazon Timestream データベースの作成

Amazon Timestream のコンソール画面で「Create database」を押す

  • Standard Database を選択、データベース名を入力(今回は jetson-nano-tsumida と入れました。)
  • Encryptionの個所では暗号化キーを選びます。

KMSからキーを選ぶことになりますが、ここで何も入れないとKMSのエイリアス aws/timestream を使用してキーが作成されます。
今回はそのままで行きます。(一度登録したことがあるので選択された状態になっていますが、初回は空白)

※Amazon timestream は標準ですべてのデータが暗号化されます。

入力後、画面下部のCreate Databaseを押します。
DBはすぐできます。

次にテーブルを作ります。

② Amazon Timestream テーブルの作成

作成したデータベースのリンクをクリックするとこの画面に遷移します。

画面中央のTable タブをクリックしてCreate Table を押します。

  • テーブル作成画面でまずテーブル名を入れます。(今回はjetson-nano-tsumida-ts-table と入れました。)
  • Data retention を設定します。

Timestreamにデータを送ると最初はメモリストアに乗ります、Memory store retentionはメモリストアにどれくらい残しておくか を設定します。Magnetic store retention はメモリから永続ストアに移した後の保存期間 を設定します。

Memory store retentionについての補足として、設定した期間より古いデータは取込めなくなるので要注意です。
 例:Memory store retention を1h にした場合、15:30 に 14:29 のタイムスタンプデータを投入しようとしても投入できない。

今回は
Memory store retention: 12Hour(s)
Magnetic store retention: 1Month(s)
で設定してみました。

入力後「Create Table」を押すとテーブルが作成されます

③ IoT Core ルール アクションの設定

前回と同様にルールとアクションを設定していきます。IoT Core IoT Core 左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。

ルールの作成画面が出てくるのでルールに名前を付け、クエリを書きます。

ルール名はjetson_nano_tsumida_tsとしました。クエリは以下です。

 select TEMPERATURE,HUMIDITY from 'data/jetson-nano-tsumida'

次に「アクションの追加」をクリックして、
「Timestreamテーブルにメッセージを書き込む」を選択して「アクションの設定」をクリックします。

アクションの設定画面で

  • データベース名とテーブル名
  • ディメンジョン名とディメンジョン値

を設定します。
データベース名は ① Amazon Timestream データベースの作成で作成したもの
テーブル名は ② Amazon Timestream テーブルの作成で作成したものを入れます。

ディメンジョン名は、データの固有ラベルのようなもので、ここで定義したディメンジョン名の列がtimestreamのデータに追加され、ディメンション値がすべてのデータに付与されます。
今回はdevice_id${clientId()} と入れました。clientId()は送信デバイスのIDを自動で設定してくれます。

次に

  • タイムスタンプの値と単位

を設定します。

今回は${timestamp()}MILLISECONDS と入れました。

${timestamp()} はデータを受け取った時刻をエポック秒で入れてくれます。データ形式はエポック秒である必要があります。

Timestamp設定でハマったポイント

どうせなら「送信時にデバイスで付与した時間」で格納しようと思い。
(送信データにはこんな感じでTimestampが入っている→ "TIMESTAMP": "2021-03-17T22:03:48"

この「送信データのtimestamp」を「TimestreamのTIMESTAMP値」として格納するには、エポックタイムへの変換が必要です。
IoT Coreのルールクエリステートメントで変換できそうなのでやってみました。

 select time_to_epoch(TIMESTAMP, "yyyy-MM-dd'T'HH:mm:ss") AS EPOCH_TIME,TEMPERATURE,HUMIDITY from 'data/jetson-nano-tsumida'

これでペイロードのtimestampはエポックタイムに変換されます。
「Timestream のアクション設定」の「タイムスタンプ値」に変換後の値 ${EPOCH_TIME} を設定して動かしてみました。
でもうまく動きませんでした。(「タイムスタンプ値」が''で飛んでいた)

S3にデータを送って確認すると
{"EPOCH_TIME":1624991287000,"TEMPERATURE":16,"HUMIDITY":40}
となっているので、変換は問題なさそうです。

色々検索した結果、どうやら「Timestreamのアクション」ではルールクエリステートメントで変換したものは認識されないようです。
「変換したもの」とは例えばFunctionなどを使って変換したものです。変換したものはAS でaliasの列名を定義してたりします。

この変換したものが「timestreamのアクション」には渡っておらず、IoT Coreが受け取った送信ペイロードの内容をそのままを使ってしまうようです。
厳密にいうと、「Timestreamのアクション」で簡単な演算(例えば四則演算など)はできそうです。

参考 ディスカッションフォーラムのtimestreamスレッド

TimestreamアクションでIoT Coreから直接連携する場合、payloadのtimestampや各valueなどは最終的に入れたい形で送る必要がありそうです。(lambdaなどを経由すればなんでもできるとは思います。)

今回は送信データをエポックタイムにする事はせず、timestamp値を ${timestamp()} (IoT Coreで処理時に取得した時間)でやりました。(そちらも試したかったという事もあり)


設定続き
+ 最後にロールを作ります。

ここでロールの作成を押しロールの名前を付けます。

今回はjetson-nano-tsumdia-ts-actionとつけました。
これでアクションの作成、ルールの作成を順に押してルールアクションの設定完了です。

④ データを確認する

Amazon timestream のコンソールを開き、データベース名をクリック、テーブル名をクリック してこの画面を開きます。

画面右上の「Query table」を押し、表示されたSQLをそのまま実行します。
このようにHUMIDITYとTEMPERATUREが格納されているのが見れました。

⑤ 今回のまとめ(IoT Core → ルール → Timestream)

IoT Core に届いたデータを直接timestreamに入れてみました。
設定自体は簡単でした。すぐにできます。
ただ、現状のアクション機能ではペイロード値の変換ができないので、入れたい形式のままエッジから送るか
Dynamoと同じようにkinesis → lambdaを経由して入れるのが良いかもしれません。

今回はここまでにします。次回は 4「Amazon Elasticsearch Serviceにメッセージを送信する」です。

次回 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その4)