Nifi + Kafka + Strom を用いたデータ処理ハンズオン(1) ~Nifiの基礎部分~


Theme

今回、以下を参考に新しいバージョンでのNifi, Kafka, Stormをつかった検証を行っていきたいと思います。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts

主に、今回はNifiのパートになります。

Apache NiFiとは

システム間のデータフローを自動化し管理するためのOSS

Nifi 基本操作

login

NifiはNifiを動かしているサーバの9090portを使用します。

Process Groupの作成

赤く囲った部分をドラッグして、方眼上に持っていき、リリースしてください。

今回、名前をHTTP APIとします。

ADDを押し、少し時間を置くと以下のようにProcess Groupが生成されます。
そうしたら、ダブルクリックをして中に入ってください。

処理を定義

HTTP APIに入ると、左下のOperatorHTTP APIに変わります。

HandleHttpRequestプロセッサの追加

検索場所にてHandleと打つと、HandleHttpRequestというのがあるので、それをダブルクリックします。

(追加後)

上の様に表示されましたら、追加したHandleHttpRequestプロセッサを右クリックし、Configureを選択します。

Propertyタブに移動し、Listening Port9095にします。
その後、APPLYを選択します。

ReplaceTextの追加

こちらでは、HTTPレスポンスで返すボディの文字列を設定します。
前と同様に、Processorを追加します。

追加されたProcessorに対して、右クリックし、Configureを選択します。
Propertyタブに移動し、Replacement Value{"Result": "Succeeded"},
Replacement StrategyAlways Replace
に変更し、APPLYを選択します。

HandleHttpResponseプロセッサの追加

次は、HandleHttpResponceプロセッサを追加します。

(追加後)

そうしたら、HandleHttpResponseのstatusコードを設定していきます。
HandleHttpResponce上で右クリックをし、Configureを選択します。
その後、PROPERTIESにてHTTP Status Codeを決め打ちで202(Accepted)
にします。

3つのプロセッサの接続

NiFiのデータフローは、プロセッサ同士をRelationshipでつなぐことで流れを生成します。
プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。

From ProcessorTo Processorを確認し、For relationshipsSuccessを選択し、ADDを選択します。

(接続後)

LogAttributeにFailureを流す

プロセッサの処理が失敗した場合、渡ってきたFlowFileはRelationshipfailureに流されます。
失敗内容が確認できるように、LogAttributeプロセッサを追加します。

(追加後)

その後、'Relationship'で接続します。

不要なRelationshipのAuto-Terminate

NiFiの内部を流れるFlowFileは、行き先がなくなる (終点までたどり着く) と、削除されます。
今回の設定では、HandleHttpResponseLogAttributeにおいて、処理がSuccessになった後の処理がありません。
なので、そのあとはNifiFlowを自動で消去する設定を入れてあげます。

以下のRelationshipに関して、プロセッサで右クリックをし、Configureを選択します。'SETTINGSタブからAutomatically Terminate Relationshipsにおいて、success`にチェックを入れます。

  • HandleHttpResponse: success
  • LogAttribute: success

Http Context Mapの作成

HandleHttpRequest上で右クリックをし、Configureを選択します。
その後PROPERTIESタブのHttp Context Mapのプルダウンより、Create new service...を選択します。

すると、次の画面のようになります。
CREATEを選択します。

その後、APPLYを選択すると、以下の画面になります。

右側、真ん中の雷のようなアイコンを選択すると以下の画面が表示されるので、ENABLEを選択します。

同様にHandleHttpRequestのHTTP Context Mapでも同じControllerServiceを指定します。
これでプロセッサに必要な設定が完了します。

フローの開始

左側にあるOperateの再生ボタンを選択し、フローを開始します。
一度、方眼上でクリックをし、特定のプロセッサから選択を外してから行ってください。

全て起動が成功すると、以下の様になります。

cURLで簡単なテスト

curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 10:47:51 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

NifiでもIn/Outから、処理の流れが確認できます。

データの変換

プロセッサの作成

POSTで受信したデータをKafkaに登録する前に整形していきます。

次のプロセッサを追加します:

  • EvaluateJsonPath: FlowFileのcontentをJSONとしてパースし、JSONPathで任意の要素の値を抽出します。
    抽出結果をFlowFileのAttributeに保存します。

  • ReplaceText: Kafkaに登録する際、FlowFileのContentがメッセージのvalueとなります。
    抽出したJSON内の値を再びFlowFileのContentに戻します。

EvaluateJsonPath

Processの追加をします。

(追加後)

その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。

  • Destination flowfile-attribute

以下2つは、右上の+ボタンを押して追加していきます。
- message.key
$.name
- message.value
$.age

これによって、NifiFlowの中にmessage.key: $.name(つまりJsonでのnameに対するvalue)というkey-value型のデータが格納されます。
message.valueも同様です。

ReplaceText

Processの追加をします。

(追加後)

その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。

  • Replacement Value
    ${message.value}
    これは、$で始まっていますが、Apache NiFi Expression Language Guideというもので、JsonPathではありません。

  • Replacement Strategy
    Always Replace

Output Portの追加

以下の箇所から、Output Portを作成し、名前を入力してADDを選択します。

(追加後)

processの接続

今回作成したprocessを接続して、前回したテストを実行してみます。

EvaluateJsonPathからReplaceTextへのRelationshipでは、matchedを選択します。

最終的に以下のようになります。

では、起動してテストをしてみます。
テスト結果

curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 13:21:56 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

先ほどと同様の結果です。
では、今回作ったフローの流れはどうなっているかというと
フローのIn/Outはそれぞれ1になっています。

Output Portの前のConnectionの上で右クリックしてList Queueを選択してみます。

すると、下の様にQueueの中に先ほどのNifiFlowが格納されています。

次回

次回では、このNifiからKafkaに対してメッセージを送りたいと思います。