EMQ Xルールエンジンシリーズ(12)は、メッセージをRedisに格納する


Redisの紹介
Redisは、BSDプロトコルを完全にオープンソースで無料で遵守する高性能key-valueデータベースです.
他のkey-valueキャッシュ製品Redisに比べて、次のような特徴があります.
  • Redisは非常に高性能で、単機で10万級の読み書き速度をサポートしています.
  • Redisはデータの永続化をサポートし、メモリのデータをディスクに保存し、再起動時に再ロードして使用することができます.
  • Redisは、単純なkey-valueタイプのデータだけでなく、list、set、zset、hashなどのデータ構造の記憶も提供します.
  • Redisは、データのバックアップ、すなわちmaster-slaveモードのデータバックアップをサポートします.

  • 読者は、Redis公式のQuick Start(https://redis.io/topics/quick...)を参照してRedisをインストールし(この文書を書く場合、Redisバージョンは5.0)、redis-serverコマンドでRedisサーバを起動することができます.
    シーン紹介
    このシーンでは、EMQXがトピックの下に指定し、条件を満たすメッセージをReidsに格納する必要がある.後続の分析検索を容易にするためには、メッセージコンテンツを分割して格納する必要があります.
    このシナリオでは、デバイス側から次の情報が報告されます.
  • 報告テーマ:cmd/state/:id、テーマ中idは車両クライアント識別コード
  • を表す.
  • メッセージボディ:
    {
      "id": "NXP-058659730253-963945118132721-22", //       
      "speed": 32.12, //     
      "direction": 198.33212, //     
      "tachometer": 3211, //      ,     8000      
      "dynamical": 8.93, //     
      "location": { // GPS      
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 //     
    }
  • 報告されたデータエンジン回転数が8000より大きい場合、現在の情報は、ユーザ車両の使用状況を後で分析するために記憶される.
    構成の説明
    リソースの作成
    EMQ X Dashboardを開き、左側のメニューのリソースページに入り、新規ボタンをクリックし、Redisサーバ情報を入力してリソース作成を行います.
    EMQ Xクラスタ内のノードが存在するネットワーク環境は互いに異なる可能性があり、リソースの作成に成功した後、リスト内のステータスボタンをクリックし、各ノードのリソース接続状況を確認し、ノード上のリソースが使用できない場合は、構成が正しいかどうか、ネットワーク接続性を確認し、再接続ボタンをクリックして手動で再接続してください.
    ルールの作成
    左側のメニューのルールページに入り、新規ボタンをクリックしてルール作成を行います.ここでは、イベント・メッセージのパブリケーションをトリガーすることを選択し、メッセージのパブリケーション時にルールをトリガーしてデータ処理を行います.
    トリガイベントを選択すると、オプションフィールドとサンプルSQLがインタフェースに表示されます.
    必要なフィールドのフィルタ
    ルール・エンジンはSQL文を使用してルール条件を処理します.このビジネスでは、payloadのすべてのフィールドを個別に選択し、payload.fieldName形式で選択する必要があります.また、メッセージ・コンテキストのtopicqosidの情報も必要です.現在のSQLは次のとおりです.
    SELECT
      payload.id as client_id, payload.speed as speed, 
      payload.tachometer as tachometer,
      payload.ts as ts, id
    FROM
      "message.publish"
    WHERE
      topic =~ 't/#'

    フィルタ条件の確立
    SQL文WHERE文を使用して条件フィルタを行います.このビジネスでは、2つの条件を定義する必要があります.
  • cmd/state/:idトピックのみを処理し、=~はトピックワイルドカードtopicを使用してフィルタリングする:topic =~ 'cmd/state/+'
  • .
  • tachometer > 8000のメッセージのみを処理し、比較子を使用してtachometerをフィルタリングする:payload.tachometer > 8000
  • .
    前のステップを組み合わせてSQLは次のようになります.
    SELECT
      payload.id as client_id, payload.speed as speed, 
      payload.tachometer as tachometer,
      payload.ts as ts,
      id
    FROM
      "message.publish"
    WHERE
      topic =~ 'cmd/state/+'
      AND payload.tachometer > 8000

    SQLテスト機能による出力テスト
    SQLテスト機能を使用すると、payloadなどのシミュレーション元のデータを指定する必要がある現在のSQL処理後のデータ出力をリアルタイムで表示できます.
    payloadデータは、SQL条件を満たすためにtachometerの数値サイズを変更することに注意してください.
    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 9001,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }

    SQLテスト切替ボタンをクリックし、topicpayloadをシーンの情報に変更し、テストボタンをクリックしてデータ出力を表示する.
    テスト出力データは次のとおりです.
    {
      "client_id": "NXP-058659730253-963945118132721-22",
      "id": "589A429E9572FB44B0000057C0001",
      "speed": 32.12,
      "tachometer": 9001,
      "ts": 1563268202
    }

    テスト出力は予想通りで、後続のステップを行うことができます.
    レスポンスアクションを追加し、メッセージをRedisに格納
    SQL条件入力出力に誤りがない場合は、引き続き適切なアクションを追加し、SQL文の書き込みを構成し、フィルタ結果をRedisに格納します.
    応答アクションの追加ボタンをクリックし、データをRedisアクションに保存することを選択し、選択したリソースを選択します.${fieldName}構文を使用してSQL文を入力し、データをデータベースに挿入し、最後に新しいボタンをクリックしてルールの作成を完了します.
    動作のSQL構成は次のとおりです.
    HMSET test client_id "${client_id}" speed "${speed}" tachometer "${tachometer}" ts "${ts}" msg_id "${msg_id}"

    Redisのハッシュ・テーブル構造を使用して、メッセージidを示すハッシュ・テーブルを作成します.
    テスト
    予想される結果
    処理アクションを含むルールを作成しました.アクションの期待効果は次のとおりです.
  • デバイスがcmd/state/:idトピックにメッセージを報告すると、メッセージのtachometerの数値が8000を超えるとSQLにヒットし、ルールリストのヒット数が1増加する.
  • Redisは、現在のメッセージと一致する現在のメッセージidに命名されたハッシュ・テーブルを追加します.

  • DashboardのWebsocketツールを使用してテスト
    ツール-->Websocketページに切り替え、任意の情報クライアントを使用してEMQ Xに接続し、接続に成功した後、メッセージカードに次の情報を送信します.
  • トピック:cmd/state/NXP-0586599730253-96394511822721-22
  • メッセージボディ:
    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 8081,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }
  • 送信ボタンをクリックすると、現在のルールのヒット統計値が1であることがわかります.
    Redisコマンドラインでハッシュ表レコードを表示すると、次のようなデータが得られます.
    これにより,ルールエンジンを用いてReidsにメッセージを格納するビジネス開発を実現した.
    詳細については、公式サイトemqx.ioにアクセスするか、オープンソースgithub.com/emqx/emqxに注目してください.詳細については、公式ドキュメントにアクセスしてください.