Nifi + Kafka + Strom を用いたデータ処理ハンズオン(2) ~Kafkaへのメッセージ送信~


Theme

今回は前回に引き続き、以下を参考に進めていきます。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts

前回のを見てない場合は、こちらからご確認いただければと思います。
Part1

今回はKafkaについて見ていきます。

Kafkaとは

Kafkaは大量のメッセージを高速に扱うことができる分散メッセージシステムで、メッセージングキューとして必要な様々な機能があります。

大まかに分けて、Producer(書き込み)、Topic(Queue)、Consumer(読み込み)で構成されます。
ProducerがTopicに書き込み、ConsumerはTopicにデータをとりに行きます。

Reporting ProcessGroupの作成

今回は、前回までのHTTP APIとは別にReportingというProcess Groupを作成します。

参考記事からそのまま言葉をお借りしますが、

単一のグループで作業を進め、大きなデータフローになってくると、管理が煩雑になってしまいます。 チュートリアルでは、以下の2つに分けています:

  • データを外部から収集し、共通のフォーマットに変換する部分
  • 共通のフォーマットのデータを入力として、Kafkaにメッセージ登録を行う部分

こうすることで、HTTP以外の、TCPやMQTTなどでメッセージを受信するルートを増やす際に、変換部分のみを実装すれば良くなります。

Processの配置

Input Portの追加

他のProcessGroupからFlowFileを渡すために、ReportingグループへInput Portを追加していきます。

(追加後)

PublishKafka_2_0の追加

PublishKafkaに関しては、自分の環境(kafkaのversion)にあったものを設定してください。
私の環境は、ambari2.7.3で入れたものになるので、2.0となっています。

前回同様、ConfigureからPROPATIESに行き、以下を設定します。

  • Kafka Brokers
    localhost:6667

  • Topic Name
    input

  • Delivery Guarantee
    Guarantee Replicated Delivery

  • Kafka Key
    ${message.key}

Kafkaメッセージはkeyとvalueを持っています。 Keyには、渡ってきたFlowFileのmessage.key Attributeを利用します。
Valueには、FlowFileのcontentが渡されます。

kafka BrokersのPortについては、ambariからkafkaをインストールした場合は、
Kafka > Configsから確認できます。

SETTINGSタブでは、Automatically terminate relationshipsにおいてsuccessにチェックを入れます。

APPLYを選択した後、LogAttributeも設置し、すべてをConnectionを作成しましょう。

HTTP APIからReportingへデータを流し込む

ルートProcessGroupへと戻り、HTTP APIからReportingへとRelationをつなぎます。
こうすることで、ProcessGroup間はInput PortとOutput Portでデータの連携が可能になります。

Console Consumerで確認しながらテスト

サーバにSSHでログインし、以下のコマンドでConsole Consumerを起動します。

cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --topic input --bootstrap-server localhost:6667 --new-consumer

別terminalで以下を実施します。

# curl -i -X POST -H "Content-type: application/json" -d '{"name": "C", "age": 20}' localhost:9095

#### Postした側 ####
HTTP/1.1 202 Accepted
Date: Tue, 15 Oct 2019 13:59:47 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

#### consumer ####
20

Ctrl+Cを押すと

Processed a total of 1 messages

と表示されると思います。

これで、Postしたデータに変更を加えて、Kafkaに送信し、そのメッセージを受け取るまでのフローをNifiから操作できました。

次回

次は、このメッセージをStormに送ってリアルタイム分析をしていきます。