Tabe/gのデータ解析部分に関して


説明内容

  • Maker Faire Tokyo 2017の中で出展した「Tabe/g」で利用したものの説明
  • ここに記載するのは、データをクラウドに吸い上げて可視化するまでのところになります

システム概要

  • データの流れは下記の通りで、今回は太字のところを試す

RaspberryPi3 -> Soracom Air -> Soracom Beam -> AWS IoT -> Amazon Elasticsearch Service -> kibana

やった見たこと

  • Amazon Elasticsearch Serviceの立ち上げ
  • AWS IoTの作成
  • SORACOM Beamの設定

Amazon Elasticsearch Service

立ち上がりが遅いらしいので、Elasticsearchを先に立ち上げておきます

1.ドメインの定義:ドメイン名を「tabegram-sensor」にし、バージョンが5.3を指定

2.クラスタの設定:インスタンス数1、インスタンスタイプt2.small.erasticsearchを指定(お試しなので、専用マスター、ゾーン対応は無効のままにしておきます)し、その他の設定はデフォルトのものを指定

3.アクセスの設定:テンプレートで「1つ以上のAWSアカウントまたはIAMユーザを許可」を選択し、自分のアカウント入れておく

4.確認:再度設定内容を確認して問題ないければ確認ボタンを押す
5.作成されると下記のような画面になり、設定が完了するとドメインのステータスがアクティブになるので、それまでは待ち

AWS IoT

基本的には「Raspberry Pi3×SORACOM×AWS IoT×myThingsで勤怠連絡ボタンを作ってみる」の証明書にアタッチまでは同様。

1.ルールを作成時に、アクションでElasticsearch Serviceを選択

2.必要項目を入れる(IAMも必要であれば適当に作成)

3.下記のようにアクションを設定できたらルールを作成ボタンを押す

疎通確認

  • mosquitto_pub(PC)から下記のコマンド確認
$ mosquitto_pub --cafile rootCA.pem \
  --cert xxxxxxxxxx-certificate.pem.crt \
  --key xxxxxxxxxx-private.pem.key \
  -h xxxxxx.iot.us-west-2.amazonaws.com \
  -p 8883 -d -t topic/tabegram \
  -m '{"timestamp":"2017-08-01T15:20:53.420Z","temperature":20}'
Client mosqpub/821-YukinoMacBo sending CONNECT
Client mosqpub/821-YukinoMacBo received CONNACK
Client mosqpub/821-YukinoMacBo sending PUBLISH (d0, q0, r0, m1, 'topic/tabegram', ... (57 bytes))
Client mosqpub/821-YukinoMacBo sending DISCONNECT
  • Node.jsで確認
meature.js
'use strict'

var awsIot = require( 'aws-iot-device-sdk');
var device = awsIot.device({
    keyPath: 'xxxxxxxxxx-private.pem.key',
    certPath: 'xxxxxxxxxx-certificate.pem.crt',
    caPath: 'rootCA.pem',
    clientId: 'tabegram',
    host: 'xxxxxxxxxx.iot.us-west-2.amazonaws.com'
});

var getRandomData = function(value){
    return Math.floor(value + Math.random()*value/10 - value/10/2);
}

var sendSensorData = function(){
    var data = JSON.stringify({
        timestamp: new Date(),
        temperature: getRandomData(20),
    }); 
    console.log(data);
    device.publish('topic/tabegram', data);
}

console.log('before connect');
device.on('connect', function(){
    console.log('connect');
    setInterval(function(){
        sendSensorData();
    }, 1000);
});

下記は実行結果

$ npm init
$ npm install --save aws-iot-device-sdk
$ node weight.js 
before connect
connect
{"timestamp":"2017-08-01T14:55:15.827Z","temperature":19}
・・・・・

Kibana

  • Elastictsearch ServiceからKibanaのリンクをクリック

  • 最初は下記の画面になるので、「Index name or pattern」「Time-field name※」にtimestampを指定し、Createボタンをクリック

    ※上記で試しにデータをストアしていないと「Time-field name」にtimestampの選択肢がでない

  • Discoverタブを開いて下記のようになって入ればデータが取れている

  • Visualizeタブでグラフを作成

  • DashBoardタブで必要なグラフを置いておけば表示しておきたいデータを可視化できます

SORACOM Beam

$ mosquitto_pub -d -h beam.soracom.io -t topic/tabegram \
  -m '{"timestamp":"2017-08-01T15:19:53.420Z","temperature":20}' -p 1883
Client mosqpub/1426-darmaso-ra sending CONNECT
Client mosqpub/1426-darmaso-ra received CONNACK
Client mosqpub/1426-darmaso-ra sending PUBLISH (d0, q0, r0, m1, 'topic/tabegram', ... (57 bytes))
Client mosqpub/1426-darmaso-ra sending DISCONNECT

他部分と接続

全体絵としては下記

①計測器(Arduino) -> ②通信&ハブ(RaspberryPi) -> ③鏡(Arduino)

1.計測器からは、000〜999のシリアルデータを受け取る
2.受け取ったデータをキャストして、timestamp付きでSORACOM BeamへMQTTで流す(pythonコードは下記)

tabegram.py
import serial
import time
import json
import paho.mqtt.client as mqtt
from datetime import datetime
from pytz import timezone

def on_connect(client, userdata, rc):
  print("Connected with result code " + str(rc))

def on_disconnect(client, userdata, rc):
  if rc != 0:
    print("Unexpected disconnection.")

def on_publish(client, userdata, mid):
  print("publish: {0}".format(mid))

def main():
  con = serial.Serial('/dev/ttyACM0', 9600)
  print con.portstr
  # Setting MQTT
  client = mqtt.Client()
  client.on_connect = on_connect
  client.on_disconnect = on_disconnect
  client.on_publish = on_publish
  client.connect("beam.soracom.io", 1883, 60) 

  while 1:
    str=con.read(3)
    if str != "": 
      print int(str)
      now = datetime.now()
      utc_now = datetime.now(timezone('UTC'))
      iso_now = utc_now.isoformat()
      now_s = "%s" % iso_now
      message = json.dumps({"timestamp":now_s, "weight":int(str)})
      client.publish("topic/tabegram", message)

if __name__ == '__main__':
    main()

3.鏡には特定のデータ範囲によって文字列をシリアルデータで送信
4.Kibanaの方には下記のようにデータが可視化される

感想

  • AWS IoTからElasticksearchへ直結できるのはありがたい
  • あとはKibanaをどこまでカスマイズできるか、例えば単純なブラウズ自動更新機能とかが入っていないのは残念
    • リアルタイムはある程度担保できたので、あとは見せ方のみ
  • とはいえあサーバレスでここまでできるのはとても楽チンでよかったですmm

参考