EMQ Xルールエンジンシリーズ(六)DynamoDBデータベースへのメッセージの格納

7366 ワード

Amazon DynamoDB紹介
Amazon DynamoDBは、キー値とドキュメントデータ構造をサポートする完全に管理されているNoSQLデータベースサービスです.
Amazon DynamoDBはアマゾンがAWSクラウド製品の組み合わせの一部として提供し、迅速で予想可能な性能を提供し、シームレスな拡張を実現することができます.
Amazon DynamoDBサービスアドレス:
https://aws.amazon.com/dynamodb/
シーン紹介
このシーンでは、EMQXがトピックの下に指定し、条件を満たすメッセージをDynamoDBデータベースに格納する必要があります.後続の分析検索を容易にするためには、メッセージコンテンツを分割して格納する必要があります.
このシナリオでは、デバイス側から次の情報が報告されます.
  • レポートテーマ:cmd/state/:id、テーマidは車両クライアント識別コード
  • を表す
  • メッセージボディ:
    {
      "id": "NXP-058659730253-963945118132721-22", //       
      "speed": 32.12, //     
      "direction": 198.33212, //     
      "tachometer": 3211, //      ,     8000      
      "dynamical": 8.93, //     
      "location": { // GPS      
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 //     
    }
  • 報告されたデータエンジン回転数が8000より大きい場合、現在の情報は、ユーザ車両の使用状況を後で分析するために記憶される.
    準備作業
    DynamoDBデータテーブルの定義
    シーン要件に応じて、定義データテーブルuse_statisticsは、以下のように構成される.
    use_statistics.json
    {
        "TableName": "use_statistics",
        "KeySchema": [
            { "AttributeName": "client_id", "KeyType": "HASH" },
            { "AttributeName": "id", "KeyType": "RANGE" }
        ],
        "AttributeDefinitions": [
            { "AttributeName": "client_id", "AttributeType": "S" },
            { "AttributeName": "id", "AttributeType": "S" }
        ],
        "ProvisionedThroughput": {
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        }
    }

    DynamoDBデータテーブルの作成
    aws cliコマンドによりデータテーブルuse_statisticsを作成します.
    $ aws dynamodb create-table --cli-input-json file://use_statistics.json --endpoint-url http://localhost:8000
    {
        "TableDescription": {
            "AttributeDefinitions": [
                {
                    "AttributeName": "client_id",
                    "AttributeType": "S"
                },
                {
                    "AttributeName": "id",
                    "AttributeType": "S"
                }
            ],
            "TableName": "use_statistics",
            "KeySchema": [
                {
                    "AttributeName": "client_id",
                    "KeyType": "HASH"
                },
                {
                    "AttributeName": "id",
                    "KeyType": "RANGE"
                }
            ],
            "TableStatus": "ACTIVE",
            "CreationDateTime": 1563765603.777,
            "ProvisionedThroughput": {
                "LastIncreaseDateTime": 0.0,
                "LastDecreaseDateTime": 0.0,
                "NumberOfDecreasesToday": 0,
                "ReadCapacityUnits": 5,
                "WriteCapacityUnits": 5
            },
            "TableSizeBytes": 0,
            "ItemCount": 0,
            "TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/use_statistics",
            "BillingModeSummary": {
                "BillingMode": "PROVISIONED",
                "LastUpdateToPayPerRequestDateTime": 0.0
            }
        }
    }

    作成に成功したらaws cliコマンドでデータテーブルが存在するかどうかを確認します.
    $ aws dynamodb list-tables --region us-west-2 --endpoint-url http://127.0.0.1:8000
    {
        "TableNames": [
            "use_statistics"
        ]
    }

    構成の説明
    リソースの作成
    EMQ X Dashboardを開き、左側のメニューのリソースページに入り、新規ボタンをクリックし、DynamoDBサーバ情報を入力してリソース作成を行います.
    EMQ Xクラスタ内のノードが存在するネットワーク環境は互いに異なる可能性があり、リソースの作成に成功した後、リスト内のステータスボタンをクリックし、各ノードのリソース接続状況を確認し、ノード上のリソースが使用できない場合は、構成が正しいかどうか、ネットワーク接続性を確認し、再接続ボタンをクリックして手動で再接続してください.
    ルールの作成
    左側のメニューのルールページに入り、新規ボタンをクリックしてルール作成を行います.ここでは、イベント・メッセージのパブリケーションをトリガーすることを選択し、メッセージのパブリケーション時にルールをトリガーしてデータ処理を行います.
    トリガイベントを選択すると、オプションフィールドとサンプルSQLがインタフェースに表示されます.
    必要なフィールドのフィルタ
    ルール・エンジンはSQL文を使用してルール条件を処理します.このビジネスでは、payloadのすべてのフィールドを個別に選択し、payload.fieldName形式で選択する必要があります.また、メッセージ・コンテキストのtopicqosidの情報も必要です.現在のSQLは次のとおりです.
    SELECT
      payload.id as client_id, payload.speed as speed, 
      payload.tachometer as tachometer,
      payload.ts as ts, id
    FROM
      "message.publish"
    WHERE
      topic =~ 't/#'

    フィルタ条件の確立
    SQL文WHERE文を使用して条件フィルタを行います.このビジネスでは、2つの条件を定義する必要があります.
  • cmd/state/:idトピックのみを処理し、=~はトピックワイルドカードtopicを使用してフィルタリングする:topic =~ 'cmd/state/+'
  • .
  • tachometer > 8000のメッセージのみを処理し、比較子を使用してtachometerをフィルタリングする:payload.tachometer > 8000
  • .
    前のステップを組み合わせてSQLは次のようになります.
    SELECT
      payload.id as client_id, payload.speed as speed, 
      payload.tachometer as tachometer,
      payload.ts as ts,
      id
    FROM
      "message.publish"
    WHERE
      topic =~ 'cmd/state/+'
      AND payload.tachometer > 8000

    SQLテスト機能による出力テスト
    SQLテスト機能を使用すると、payloadなどのシミュレーション元のデータを指定する必要がある現在のSQL処理後のデータ出力をリアルタイムで表示できます.
    payloadデータは、SQL条件を満たすためにtachometerの数値サイズを変更することに注意してください.
    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 9001,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }

    SQLテスト切替ボタンをクリックし、topicpayloadをシーンの情報に変更し、テストボタンをクリックしてデータ出力を表示する.
    テスト出力データは次のとおりです.
    {
      "client_id": "NXP-058659730253-963945118132721-22",
      "id": "589A429E9572FB44B0000057C0001",
      "speed": 32.12,
      "tachometer": 9001,
      "ts": 1563268202
    }

    テスト出力は予想通りで、後続のステップを行うことができます.
    応答アクションを追加し、DynamoDBにメッセージを格納
    SQL条件入力出力に誤りがない場合は、引き続き対応するアクションを追加し、SQL文の書き込みを構成し、フィルタ結果をDynamoDBに格納します.
    応答動作の追加ボタンをクリックし、データをDynamoDBに保存する動作を選択し、選択したリソースを選択し、入力
    DynamoDBテーブル名、Hash KeyおよびRange Key.
    テスト
    予想される結果
    処理アクションを含むルールを作成しました.アクションの期待効果は次のとおりです.
  • デバイスがcmd/state/:idトピックにメッセージを報告すると、メッセージのtachometerの数値が8000を超えるとSQLにヒットし、ルールリストのヒット数が1増加する.
  • DynamoDBのuse_statistics表に、現在のメッセージと一致するデータが追加されます.

  • DashboardのWebsocketツールを使用してテスト
    ツール-->Websocketページに切り替え、任意の情報クライアントを使用してEMQ Xに接続し、接続に成功した後、メッセージカードに次の情報を送信します.
  • トピック:cmd/state/NXP-0586599730253-96394511822721-22
  • メッセージボディ:
  • {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 9001,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }

    送信ボタンをクリックすると、送信に成功した後、現在のルールがヒットした統計値が1であることがわかります.
    aws cliコマンドラインのデータテーブルレコードを表示すると、次のようになります.
    これにより,ルールエンジンを用いてDynamoDBにメッセージを格納するビジネス開発を実現した.
    詳細については、公式サイトemqxにアクセスしてください.io、あるいは私たちのオープンソースプロジェクトgithubに注目します.com/emqx/emqx、詳細は公式ドキュメントにアクセスしてください.