PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 - Part 2: KafkaとLandoop


 前回はRaspberry Pi 3上でSensorTagから環境データを取得するPythonスクリプトを書きました。この環境データはKafkaを経由してストリーム処理する予定です。次にRaspberry Pi 3からメッセージを受け取るKafkaクラスタをクラウド上に構築していきます。
 
 KafkaクラスタはLandoopが開発しているfast-data-devのDockerイメージを使います。

LandoopでKafkaクラスタを構築する

 LandoopはKafkaのソリューションを提供する会社です。特にKafka Connectを中心にしたストリームのデータパイプラインの開発に強い印象です。

Landoop 

 
 LandoopではKafka Topics UIKafka Connect UIKafka Topics UIなどのWebツールを開発しています。demoサイトからどのようなツールなのかを確認できます。このdemoサイトと同じ環境はfast-data-connect-clusterを使うと簡単に構築することができます。
 
 

fast-data-connect-cluster

 fast-data-connect-clusterのリポジトリをcloneします。

$ git clone https://github.com/Landoop/fast-data-connect-cluster

 
 リポジトリに含まれるdocker-compose.ymlを少し変更してクラウド上で利用します。Debianの仮想マシンを用意してDockerとDocker Composeをインストールしました。利用するバージョンは以下です。

$ docker --version
Docker version 17.06.0-ce, build 02c1d87

$ docker-compose --version
docker-compose version 1.14.0, build c7bdf9e

 このdocker-compose.ymlにはLandoopのWebツールに加えてConfluent Open Sourceに含まれるKafkaSchema RegistryKafka REST ProxyKafka ConnectApache ZooKeeperが含まれます。一通りKafkaを使った開発に必要なコンテナが揃うのでとても便利です。

 現在Confluent Open SourceとKafkaのバージョンは以下になっています。
 
* Confluent Open Source: v3.2.2
* Kafka v0.10.2.1

 docker-compose.ymlの主な変更点です。

  • ADV_HOST: Dockerが起動している仮想マシンのパブリックIPアドレスを指定します。
  • ports: リモートから接続するためにZooKeeperやKafkaクラスタのポートを公開します。
docker-compose.yml
version: '2'
services:
  kafka-stack:
    image: landoop/fast-data-dev
    environment:
      - FORWARDLOGS=0
      - RUNTESTS=0
      - ADV_HOST=210.xxx.xxx.xxx
    ports:
      - 3030:3030
      - 9092:9092
      - 2181:2181
      - 8081:8081
  connect-node-1:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-2:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-3:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-ui:
    image: landoop/kafka-connect-ui:latest
    depends_on:
      - connect-node-1
    environment:
      - CONNECT_URL=http://connect-node-1:8083
    ports:
      - 8000:8000

 docker-compose.ymlのディレクトリに移動してコンテナを起動します。

$ cd fast-data-connect-cluster
$ docker-compose up -d

動作確認

 Kafkaクライアントから簡単に動作確認をします。kafka-topicsコマンドでトピックを作成します。

$ docker-compose exec kafka-stack kafka-topics \
    --create --topic test \
    --zookeeper localhost:2181 \
    --partitions 1 --replication-factor 1

 kafka-console-consumerコマンドを実行します。メッセージがトピックに届くとこのシェルに表示されます。

$ docker-compose exec kafka-stack \
  kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --from-beginning \
  --topic test

 別のシェルからkafka-console-producerコマンドを実行します。

$ docker-compose exec kafka-stack \
  kafka-console-producer \
  --broker-list localhost:9092 \
  --topic test

 コマンドは待機状態になるので適当なメッセージをシェルに入力します。同じメッセージがkafka-console-consumerのシェルにも表示されます。

hello world

 Kafka Topics UIのページではトピックの一覧とメッセージの中身を確認することができます。
 

SensorTagの環境データをKafkaに送信する

kafka-python

 PythonのKafkaクライアントにはkafka-pythonconfluent-kafka-pythonがあります。APIが微妙に違うので間違えないようにします。今回はkafka-pythonをインストールして使います。

$ sudo pip install kafka-python

 前回書いたSensorTagのデータを取得するPythonのコードにKafkaへメッセージを送信するproducerを追加します。

json_producer_sensortag_kafka.py
from bluepy.sensortag import SensorTag
import sys
import time
import json
import calendar

from kafka import KafkaProducer

def main():
    argvs = sys.argv
    argc = len(argvs)
    if (argc != 2):
        print 'Usage: # python {0} bd_address'.format(argvs[0])
        quit()
    bid = argvs[1]
    print('Connecting to ' + bid)

    timeout = 10.0

    tag = SensorTag(bid)
    tag.IRtemperature.enable()
    tag.humidity.enable()

    time.sleep(1.0)

    producer = KafkaProducer(bootstrap_servers='210.xxx.xxx.xxx:9092')
    while True:
        tAmb, tObj = tag.IRtemperature.read()
        humidity, rh = tag.humidity.read()

        value = {
            "bid" : bid,
            "time" : calendar.timegm(time.gmtime()),
            "ambient": tAmb,
            "objecttemp": tObj,
            "humidity": humidity,
            "rh": rh
        }

        msg = json.dumps(value).encode("utf-8")
        producer.send('sensortag', msg)
        producer.flush()
        print(msg)

        time.sleep(timeout)

    tag.disconnect()
    del tag

if __name__ == '__main__':
    main()

 SensorTagのBDアドレスを引数にしてPythonスクリプトを実行します。

$ python json_producer_sensortag_kafka.py B0:B4:48:BE:5E:00

 10秒間隔で取得した環境データをJSON文字列に整形してKafkaクラスタに送信します。

Connecting to B0:B4:48:BE:5E:00
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464133, "humidity": 26.8096923828125, "objecttemp": 22.0625, "ambient": 26.59375, "rh": 68.829345703125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464143, "humidity": 26.86004638671875, "objecttemp": 22.40625, "ambient": 26.65625, "rh": 68.927001953125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464153, "humidity": 26.92047119140625, "objecttemp": 22.71875, "ambient": 26.71875, "rh": 68.95751953125}

 Kafka Topics UIの画面にもSensorTagの環境データが表示されました。