Node-REDでのElasticsearch活用法


はじめに

Node-REDでよくElasticsearchへデータ投入するので、ノウハウを書きたいです。
追加ノードでも色々ありますが、Elasticsearchはバージョンの更新が早く、APIも変わる事があります。
そのため、標準のノードを組み合わせての利用が無難かと思ってます。

ソフトウェア環境

  • Node-RED v1.0.3
  • Elasticsearch 7.3.2

サブフロー化

まずは、Elasticsearchへ通信するhttpノードをサブフロー化します。これにより、BASIC認証などの情報をノードを設置する際に毎回設定しなくて済みます。あと、常にヘッダに"Content-Type: 'application/json'"を設定するようにしましょう。これで少し、Elasticsearchへの接続設定の手間が省けます。
URLやメソッドはリクエストにより変化するため、サブフロー内では設定しないようにしましょう。

書き出したコードは以下です。

[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]}]

1. データ投入

Linkingデバイスという、環境センサーからのビーコンを受信し、Elasticsearchへデータ投入する例です。

linking-scannerノードからは以下のようなデータがペイロードとして渡されます。

このデータをElasticsearchへ投入するために、Changeノードで msg.method と msg.url を設定します。
msg.urlは前のノードの値を使って、動的なURLを生成します。
プログラムを組める方はfunctionノードに頼りがちですが、実は使わずに生成できます。

ついでに、JSONata式を使えば、現時刻もChangeノードで生成できます。

Elasticsearchへ投入するJSONデータをテンプレートノードで生成します。
Mustacheテンプレートを使えば、msgオブジェクト内のデータが使えます。

このフローを書き出したものがこちらです。(Elasticsearchサブフローも含まれます)

[{"id":"3f7774f5.a6a20c","type":"subflow","name":"Elasticsearch","info":"","category":"","in":[{"x":50,"y":30,"wires":[{"id":"c5bb15c.4c10de8"}]}],"out":[{"x":580,"y":30,"wires":[{"id":"19d2a3b.d048a5c","port":0}]}],"env":[],"color":"#DDAA99"},{"id":"eeda88c1.f8b278","type":"http request","z":"3f7774f5.a6a20c","name":"Elasticsearch","method":"use","ret":"txt","paytoqs":false,"url":"","tls":"","persist":false,"proxy":"","authType":"basic","x":340,"y":30,"wires":[["19d2a3b.d048a5c"]]},{"id":"c5bb15c.4c10de8","type":"change","z":"3f7774f5.a6a20c","name":"msg.headers","rules":[{"t":"delete","p":"headers","pt":"msg"},{"t":"set","p":"headers","pt":"msg","to":"{\"Content-Type\":\"application/json\",\"Connection\":\"close\"}","tot":"json"}],"action":"","property":"","from":"","to":"","reg":false,"x":170,"y":30,"wires":[["eeda88c1.f8b278"]]},{"id":"19d2a3b.d048a5c","type":"json","z":"3f7774f5.a6a20c","name":"","property":"payload","action":"","pretty":false,"x":490,"y":30,"wires":[[]]},{"id":"e6695b94.08cca8","type":"subflow:3f7774f5.a6a20c","z":"f4557e75.b0f3f","x":640,"y":140,"wires":[[]]},{"id":"7108220c.1b5d9c","type":"change","z":"f4557e75.b0f3f","name":"POST _doc","rules":[{"t":"set","p":"method","pt":"msg","to":"POST","tot":"str"},{"t":"set","p":"url","pt":"msg","to":"http://localhost:9200/sensor-","tot":"str"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"payload.service","tot":"msg"},{"t":"change","p":"url","pt":"msg","from":"$","fromt":"re","to":"/_doc","tot":"str"},{"t":"set","p":"date","pt":"msg","to":"$now()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":280,"y":140,"wires":[["69e18d2a.9e18d4"]]},{"id":"69e18d2a.9e18d4","type":"template","z":"f4557e75.b0f3f","name":"ES Query JSON","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n    \"date\": \"{{{date}}}\",\n    \"device\": \"{{{payload.device}}}\",\n    \"{{payload.service}}\": {{{payload.data}}}\n}","output":"str","x":460,"y":140,"wires":[["e6695b94.08cca8"]]},{"id":"d541687c.a302d8","type":"linking-scanner","z":"f4557e75.b0f3f","name":"","autostart":true,"duration":"","interval":"30","x":100,"y":140,"wires":[["7108220c.1b5d9c"]]}]

2. 時系列クエリ

最新の温度を検索する場合のフローです。シンプルに、ChangeノードとElasticsearchサブフローのみです。

Changeノードは以下のように、msg.method と msg.url と、ms.payloadに検索クエリを設定します。

Elasticsearchの検索クエリは以下のようになってます。日付をDESCにして最初の1レコードのみを要求します。

結果は以下のようになります。 msg.payload.hits.hits[0]._source にデータが返ってきます。

直近10分間の平均は以下のような検索クエリにしています。

msg.payload.aggregations.temperature_avg.buckets[0].aggs.value に入ってますね。

3. 単語解析

Elasticsearchでは分かち書きのような事もできます。

日本語を単語分したい場合は、Elasticsearchのプラグインanalysis-kuromojiをインストールする必要があります。

結果は、msg.payload.tokens[]に入ってます。

単語が取れれば、単語についての分析などが実装できると思います。

settings , mappings もできますし、ファイルノードを組み合わせてユーザ辞書の更新もNode-REDでしています。

おわりに

個人的に、Node-REDとElasticsearchは非常に相性良く組み合わせて使う事ができると考えています。
実際に仕事ではオリジナルのデータに分析データを埋め込み、Kibanaで可視化をしています。