EMQ Xルールエンジンシリーズ(六)DynamoDBデータベースへのメッセージの格納
7366 ワード
Amazon DynamoDB紹介
Amazon DynamoDBは、キー値とドキュメントデータ構造をサポートする完全に管理されているNoSQLデータベースサービスです.
Amazon DynamoDBはアマゾンがAWSクラウド製品の組み合わせの一部として提供し、迅速で予想可能な性能を提供し、シームレスな拡張を実現することができます.
Amazon DynamoDBサービスアドレス:
https://aws.amazon.com/dynamodb/
シーン紹介
このシーンでは、EMQXがトピックの下に指定し、条件を満たすメッセージをDynamoDBデータベースに格納する必要があります.後続の分析検索を容易にするためには、メッセージコンテンツを分割して格納する必要があります.
このシナリオでは、デバイス側から次の情報が報告されます.レポートテーマ:cmd/state/:id、テーマidは車両クライアント識別コード を表すメッセージボディ: 報告されたデータエンジン回転数が
準備作業
DynamoDBデータテーブルの定義
シーン要件に応じて、定義データテーブル
use_statistics.json
DynamoDBデータテーブルの作成
aws cliコマンドによりデータテーブル
作成に成功したらaws cliコマンドでデータテーブルが存在するかどうかを確認します.
構成の説明
リソースの作成
EMQ X Dashboardを開き、左側のメニューのリソースページに入り、新規ボタンをクリックし、DynamoDBサーバ情報を入力してリソース作成を行います.
EMQ Xクラスタ内のノードが存在するネットワーク環境は互いに異なる可能性があり、リソースの作成に成功した後、リスト内のステータスボタンをクリックし、各ノードのリソース接続状況を確認し、ノード上のリソースが使用できない場合は、構成が正しいかどうか、ネットワーク接続性を確認し、再接続ボタンをクリックして手動で再接続してください.
ルールの作成
左側のメニューのルールページに入り、新規ボタンをクリックしてルール作成を行います.ここでは、イベント・メッセージのパブリケーションをトリガーすることを選択し、メッセージのパブリケーション時にルールをトリガーしてデータ処理を行います.
トリガイベントを選択すると、オプションフィールドとサンプルSQLがインタフェースに表示されます.
必要なフィールドのフィルタ
ルール・エンジンはSQL文を使用してルール条件を処理します.このビジネスでは、
フィルタ条件の確立
SQL文WHERE文を使用して条件フィルタを行います.このビジネスでは、2つの条件を定義する必要があります.は .は .
前のステップを組み合わせてSQLは次のようになります.
SQLテスト機能による出力テスト
SQLテスト機能を使用すると、payloadなどのシミュレーション元のデータを指定する必要がある現在のSQL処理後のデータ出力をリアルタイムで表示できます.
payloadデータは、SQL条件を満たすために
SQLテスト切替ボタンをクリックし、
テスト出力データは次のとおりです.
テスト出力は予想通りで、後続のステップを行うことができます.
応答アクションを追加し、DynamoDBにメッセージを格納
SQL条件入力出力に誤りがない場合は、引き続き対応するアクションを追加し、SQL文の書き込みを構成し、フィルタ結果をDynamoDBに格納します.
応答動作の追加ボタンをクリックし、データをDynamoDBに保存する動作を選択し、選択したリソースを選択し、入力
DynamoDBテーブル名、Hash KeyおよびRange Key.
テスト
予想される結果
処理アクションを含むルールを作成しました.アクションの期待効果は次のとおりです.デバイスが DynamoDBの
DashboardのWebsocketツールを使用してテスト
ツール-->Websocketページに切り替え、任意の情報クライアントを使用してEMQ Xに接続し、接続に成功した後、メッセージカードに次の情報を送信します.トピック:cmd/state/NXP-0586599730253-96394511822721-22 メッセージボディ:
送信ボタンをクリックすると、送信に成功した後、現在のルールがヒットした統計値が1であることがわかります.
aws cliコマンドラインのデータテーブルレコードを表示すると、次のようになります.
これにより,ルールエンジンを用いてDynamoDBにメッセージを格納するビジネス開発を実現した.
詳細については、公式サイトemqxにアクセスしてください.io、あるいは私たちのオープンソースプロジェクトgithubに注目します.com/emqx/emqx、詳細は公式ドキュメントにアクセスしてください.
Amazon DynamoDBは、キー値とドキュメントデータ構造をサポートする完全に管理されているNoSQLデータベースサービスです.
Amazon DynamoDBはアマゾンがAWSクラウド製品の組み合わせの一部として提供し、迅速で予想可能な性能を提供し、シームレスな拡張を実現することができます.
Amazon DynamoDBサービスアドレス:
https://aws.amazon.com/dynamodb/
シーン紹介
このシーンでは、EMQXがトピックの下に指定し、条件を満たすメッセージをDynamoDBデータベースに格納する必要があります.後続の分析検索を容易にするためには、メッセージコンテンツを分割して格納する必要があります.
このシナリオでは、デバイス側から次の情報が報告されます.
{
"id": "NXP-058659730253-963945118132721-22", //
"speed": 32.12, //
"direction": 198.33212, //
"tachometer": 3211, // , 8000
"dynamical": 8.93, //
"location": { // GPS
"lng": 116.296011,
"lat": 40.005091
},
"ts": 1563268202 //
}
8000
より大きい場合、現在の情報は、ユーザ車両の使用状況を後で分析するために記憶される.準備作業
DynamoDBデータテーブルの定義
シーン要件に応じて、定義データテーブル
use_statistics
は、以下のように構成される.use_statistics.json
{
"TableName": "use_statistics",
"KeySchema": [
{ "AttributeName": "client_id", "KeyType": "HASH" },
{ "AttributeName": "id", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "client_id", "AttributeType": "S" },
{ "AttributeName": "id", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
DynamoDBデータテーブルの作成
aws cliコマンドによりデータテーブル
use_statistics
を作成します.$ aws dynamodb create-table --cli-input-json file://use_statistics.json --endpoint-url http://localhost:8000
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "client_id",
"AttributeType": "S"
},
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"TableName": "use_statistics",
"KeySchema": [
{
"AttributeName": "client_id",
"KeyType": "HASH"
},
{
"AttributeName": "id",
"KeyType": "RANGE"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": 1563765603.777,
"ProvisionedThroughput": {
"LastIncreaseDateTime": 0.0,
"LastDecreaseDateTime": 0.0,
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/use_statistics",
"BillingModeSummary": {
"BillingMode": "PROVISIONED",
"LastUpdateToPayPerRequestDateTime": 0.0
}
}
}
作成に成功したらaws cliコマンドでデータテーブルが存在するかどうかを確認します.
$ aws dynamodb list-tables --region us-west-2 --endpoint-url http://127.0.0.1:8000
{
"TableNames": [
"use_statistics"
]
}
構成の説明
リソースの作成
EMQ X Dashboardを開き、左側のメニューのリソースページに入り、新規ボタンをクリックし、DynamoDBサーバ情報を入力してリソース作成を行います.
EMQ Xクラスタ内のノードが存在するネットワーク環境は互いに異なる可能性があり、リソースの作成に成功した後、リスト内のステータスボタンをクリックし、各ノードのリソース接続状況を確認し、ノード上のリソースが使用できない場合は、構成が正しいかどうか、ネットワーク接続性を確認し、再接続ボタンをクリックして手動で再接続してください.
ルールの作成
左側のメニューのルールページに入り、新規ボタンをクリックしてルール作成を行います.ここでは、イベント・メッセージのパブリケーションをトリガーすることを選択し、メッセージのパブリケーション時にルールをトリガーしてデータ処理を行います.
トリガイベントを選択すると、オプションフィールドとサンプルSQLがインタフェースに表示されます.
必要なフィールドのフィルタ
ルール・エンジンはSQL文を使用してルール条件を処理します.このビジネスでは、
payload
のすべてのフィールドを個別に選択し、payload.fieldName
形式で選択する必要があります.また、メッセージ・コンテキストのtopic
、qos
、id
の情報も必要です.現在のSQLは次のとおりです.SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts, id
FROM
"message.publish"
WHERE
topic =~ 't/#'
フィルタ条件の確立
SQL文WHERE文を使用して条件フィルタを行います.このビジネスでは、2つの条件を定義する必要があります.
cmd/state/:id
トピックのみを処理し、=~
はトピックワイルドカードtopic
を使用してフィルタリングする:topic =~ 'cmd/state/+'
tachometer > 8000
のメッセージのみを処理し、比較子を使用してtachometer
をフィルタリングする:payload.tachometer > 8000
前のステップを組み合わせてSQLは次のようになります.
SELECT
payload.id as client_id, payload.speed as speed,
payload.tachometer as tachometer,
payload.ts as ts,
id
FROM
"message.publish"
WHERE
topic =~ 'cmd/state/+'
AND payload.tachometer > 8000
SQLテスト機能による出力テスト
SQLテスト機能を使用すると、payloadなどのシミュレーション元のデータを指定する必要がある現在のSQL処理後のデータ出力をリアルタイムで表示できます.
payloadデータは、SQL条件を満たすために
tachometer
の数値サイズを変更することに注意してください.{
"id": "NXP-058659730253-963945118132721-22",
"speed": 32.12,
"direction": 198.33212,
"tachometer": 9001,
"dynamical": 8.93,
"location": {
"lng": 116.296011,
"lat": 40.005091
},
"ts": 1563268202
}
SQLテスト切替ボタンをクリックし、
topic
とpayload
をシーンの情報に変更し、テストボタンをクリックしてデータ出力を表示する.テスト出力データは次のとおりです.
{
"client_id": "NXP-058659730253-963945118132721-22",
"id": "589A429E9572FB44B0000057C0001",
"speed": 32.12,
"tachometer": 9001,
"ts": 1563268202
}
テスト出力は予想通りで、後続のステップを行うことができます.
応答アクションを追加し、DynamoDBにメッセージを格納
SQL条件入力出力に誤りがない場合は、引き続き対応するアクションを追加し、SQL文の書き込みを構成し、フィルタ結果をDynamoDBに格納します.
応答動作の追加ボタンをクリックし、データをDynamoDBに保存する動作を選択し、選択したリソースを選択し、入力
DynamoDBテーブル名、Hash KeyおよびRange Key.
テスト
予想される結果
処理アクションを含むルールを作成しました.アクションの期待効果は次のとおりです.
cmd/state/:id
トピックにメッセージを報告すると、メッセージのtachometer
の数値が8000を超えるとSQLにヒットし、ルールリストのヒット数が1増加する.use_statistics
表に、現在のメッセージと一致するデータが追加されます.DashboardのWebsocketツールを使用してテスト
ツール-->Websocketページに切り替え、任意の情報クライアントを使用してEMQ Xに接続し、接続に成功した後、メッセージカードに次の情報を送信します.
{
"id": "NXP-058659730253-963945118132721-22",
"speed": 32.12,
"direction": 198.33212,
"tachometer": 9001,
"dynamical": 8.93,
"location": {
"lng": 116.296011,
"lat": 40.005091
},
"ts": 1563268202
}
送信ボタンをクリックすると、送信に成功した後、現在のルールがヒットした統計値が1であることがわかります.
aws cliコマンドラインのデータテーブルレコードを表示すると、次のようになります.
これにより,ルールエンジンを用いてDynamoDBにメッセージを格納するビジネス開発を実現した.
詳細については、公式サイトemqxにアクセスしてください.io、あるいは私たちのオープンソースプロジェクトgithubに注目します.com/emqx/emqx、詳細は公式ドキュメントにアクセスしてください.