KafkaStreams紹介(二)


説明:
この文書は、Confluent Platform 3.0のKafka Streamsの翻訳です.
原文住所:https://docs.confluent.io/3.0.0/streams/index.html
他の人が翻訳しているドキュメントをたくさん見て、初めて翻訳したので、何か翻訳の悪いところがあったら指摘してください.
 
これはKafka Streamsが紹介した2編目で、以前の紹介は以下の通りです.
http://blog.csdn.net/ransom0512/article/details/51971112
 
1.クイックスタート
1.1.ターゲット
このクイックエントリーガイドの目的は、KafkaStreamsとの最初のアプリケーションの例を提供することです.Kafka Streamsライブラリを使用して、エンドツーエンドの簡単なデータストリームを実証する場合、最初のサンプルプログラムで説明します.
注目すべきは、このクイックエントリーはKafkaStreamsの表面だけをカバーしており、このドキュメントの残りの部分はより詳細を提供し、クイックエントリーガイドで方向を示します.
1.2.何がしたいの?
このクイックエントリーでは、Apachekafkaを含むwordcountプレゼンテーションアプリケーションを実行します.次のコードの鍵は、Java 8のlambda式を使用して、読みやすいことです.(WordCountLambdaExampleより抜粋):
//   /    Sting Long  
final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();
 
//      topic “streams-file-input”   KStream  ,
//              topic “streams-file-input”  。
//(     ,         key.)
KStream textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
 
KStream wordCounts = textLines
//       ,              。
//          topic         Value。
//    flatMapValues         Value,       flatMap
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
//       countByKey            
//           map key。
.map((key, value) -> new KeyValue<>(value, value))
//  key          
//
//       KStream  KTable (word-count).
//        String long          。
    //
.countByKey(stringSerde, "Counts")
//  KTable KStream
    .toStream();
 
// KStream     topic 。
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

上記のコードの実行中に、次の手順に従います.
1、kafkaクラスタを起動する
2、Kafkaconsole producerコマンドライン生産者クライアントを使用してKafka Topicにサンプル入力データを書き込む
3、JavaアプリケーションでkafkaStreamライブラリを使用して入力データを処理する.ここでは,kafkaを含むWordCountのサンプルプログラムを用いた.
4、Kafkaconsole consumerコマンドライン消費者クライアントを使用してアプリケーションの出力を検査する.
5、Kafkaクラスターの停止
1.3.Kafkaクラスタの起動
この章では、Kafkaクラスタを1台のマシンにインストールして起動します.このクラスタは単一ノードKafka(Brokerが1つしかない)と単一ノードZookeeperとを加えて構成されている.wordcountプレゼンテーションでは,このクラスタ依存性が必要である.kafka broker実行アドレスはlocalhost:9092,Zookeeperローカルアドレスはlocalhost:2181と仮定した.
まず、oracle JREまたはJDK 1.7以降のバージョンをインストールします.
次に、Kafka Streamsを含む新しいバージョンApache Kafkaをダウンロードしてインストールします.このため、Confluent Platform 3.0.0バージョンを使用します.
△次の操作は簡単なので、翻訳しません.
# Download and install Confluent Platform 3.0.0 from ZIP archive
$ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
$ unzip confluent-3.0.0-2.11.zip
 
# *** IMPORTANT STEP ****
# The subsequent paths and commands used throughout this quickstart assume that
# your are in the following working directory:
$ cd confluent-3.0.0/
 
# Note: If you want to uninstall the Confluent Platform at the end of this quickstart,
# run the following commands.
#
#   $ rm -rf confluent-3.0.0/
#   $ rm -rf /var/lib/kafka          # Data files of Kafka
#   $ rm -rf /var/lib/kafka-streams  # Data files of Kafka Streams
#   $ rm -rf /var/lib/zookeeper      # Data files of ZooKeeper

ヒント:Installationvia ZIP and TAR archivesおよびConfluentPlatform Quickstartを使用して、より詳細な情報を取得できます.
まずZooKeeperインスタンスを起動します.このインスタンスは、ローカル2181ポートをリスニングする.これは長期にわたって実行されるサービスなので、自分の端末で実行する必要があります.
# Start ZooKeeper.  Run this command in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

次に、KakfaのBrokerを起動し、ローカル9092ポートを傍受し、起動したばかりのZookeeperインスタンスに接続します.これも長期にわたって稼働しているサービスであり、端末でも稼働しなければならない.
# Start Kafka.  Run this command in its own terminal
$ ./bin/kafka-server-start ./etc/kafka/server.properties

これで、単一ノードkafkaクラスタが完全に稼働し、入力データの準備に着手し、最初のkafka Streamsサンプルプログラムを実行することができます.
1.4.入力データの準備
ヒント:この章では、組み込まれたコマンドラインツールを使用してkakfaデータを入力します.実際の使用では、自分のアプリケーションのKafkaクライアントなど、他の方法でKafkaにデータを書き込む必要があります.
次に、いくつかの入力データをKafkaのtopicに送信し、Kafka Streamsのアプリケーションによって後続の処理を行います.
まずstreams-file-inputという名前のtopicを作成します.
$ ./bin/kafka-topics --create \
          --zookeeper localhost:2181 \
          --replication-factor 1\
          --partitions 1\
          --topic streams-file-input

次に、ローカルファイル/tmp/file-input.に保存する入力データ病を生成します.txtで.
$ echo -e "all streams lead to kafka
hello kafka streams
join kafka summit" > /tmp/file-input.txt

生成されたファイルには、次の内容が含まれます.
all streams lead to kafka
hello kafka streams
join kafka summit

最後に、これらのデータをinput topicに送信します.
$ cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input

Kafka consoleproducerはstdinからデータを読み出し、各ローを個別のメッセージとしてkafkaの入力ストリームに送信する.このメッセージのkeyはnullであり、メッセージは各行の内容であり、文字列符号化を用いる.
注意:このような一歩一歩の高速起動と実際のストリーム処理システムの違いを知りたいかもしれませんが、大規模なリアルタイムのストリーム処理システムでは、データは常に移動しており、高速入門の目的は機能証明だけです.簡単に言えば、エンドツーエンドのデータパイプは、KafkaおよびKafka Streamsの様々な態様で構築される.説教のために,我々はわざと迅速な入門を一連の連続的なステップに明確に分割した.
しかし、実際には、これらのステップは通常、いくつか異なり、同時的に存在するように見えます.例えば、入力データはローカルファイルからではなく、分散システムから直接送信され、Kafkaに連続的に書き込まれる可能性があります.同様に、ストリーム処理アプリケーションは、最初のローのデータ送信前に起動して実行される可能性があります.
 
1.5.KafkaStreamsでの入力データの処理
これで、Kafka Streamsベースのjavaアプリケーションを最初に実行できる入力データが生成されました.
ApacheKafkaを使用したWordCountプレゼンテーションアプリケーションを実行します.WordCountアルゴリズムを実現し、入力テキストからヒストグラムを計算します.しかし、前に見た他の操作がデータにバインドされているWordCountインスタンスプログラムとは異なり、このサンプルプログラムはデータが無境界で無限に流れています.境界アルゴリズムの変体と似ていて、彼は状態のあるアルゴリズムで、wordのカウンタを追跡して更新します.その後、無境界の入力データを受け入れる必要があるため、現在の状態と計算結果を周期的に低く出力し、すべてのデータが処理されているかどうか分からないため、より多くのデータを処理し続けます.これが彼とHadoopのMapeduceアルゴリズムの間の典型的な違いである.この違いを理解すると、実際の出力データをチェックすると、より受け入れやすくなります.
wordCountサンプルプログラムはkafkaとパッケージ化されているため、KafkaのBrokerに統合されています.これは、Javaソースコードをコンパイルする必要がなく、追加のことをする必要がなく、実行できることを意味します.
# Run the WordCount demo application.  There won't be any STDOUT output.
# You can safely ignore any WARN log messages.
$ ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

ここではマジック的な配置はありませんが、実際には、kafkaStreamsライブラリ内の任意のアプリケーションを使用して、通常のJavaアプリケーションを起動するように、スクリプトkafka-run-classも簡単なjava-cpコマンドのパッケージにすぎません.
このWordCountサンプルプログラムは、入力topicからデータを読み出し、wordCountを計算し、計算結果を出力し続けます.プレゼンテーションは数秒実行され、他の一般的なストリーム処理アプリケーションとは異なり、自動的に終了します.
1.6.出力結果の確認
この章では、組み込まれたコマンドラインツールを使用してkafkaからデータを手動で読み込みます.実際の使用では、Kakfaクライアントを介してKafkaからデータを読み込むことができます.たとえば、自分のアプリケーションでKafkaクライアントを使用してKakfaから他のデータシステムにデータを移行できる場合.
これで、kafka出力topicからデータを読み出し、wordcountインスタンスの実行結果を確認できます.
./bin/kafka-console-consumer --zookeeper localhost:2181 \
          --topic streams-wordcount-output \
          --from-beginning \
          --formatter kafka.tools.DefaultMessageFormatter \
          --property print.key=true\
          --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
          --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Wordcountのデータは次のコンソールに印刷されます.
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

ここで、第1列はKafkaメッセージのkeyの文字列フォーマットであり、第2列はメッセージの値、longタイプである.Ctrl+cコマンドでコンソール出力を終了できます.
でもちょっと待って、出力が変に見えますか?重複するエントリが表示されるのはなぜですか?例えばstreamsは2回現れました
# Why not this, you may ask?
all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

上記の出力についての説明は、wordCountアプリケーションの出力は、実際には継続的に更新されるストリームであり、各行のレコードは、Kafkaなどの単一のwordのカウントである.同じキーの複数のレコードに対して、そのレコードの後に前のレコードが更新されます.
 
次の2つの図は、出力後に何が起こったかを説明します.第1列はKTableすなわちcountByKeyのカウントの現在の状態の進化を示す.2列目は、状態更新からKTableへの結果と最終結果を示し、KTable#通Stream()からKStreamへのレコードが生成されると、その結果がKafkaに出力される.
まず、テキスト行「kafkaへのすべてのストリーム」が処理中であり、各新しいTable項目の新しい単語の結果がKTableオブジェクト(緑色のハイライト表示部分)として構築され、対応する変化結果が下流のKStreamに送信される.


2番目のテキスト航のhello kafkastreamsが処理されると,比較的初めて既存のエントリKTableが更新されることが観察された(KafakとStreamsの2つの単語).変更されたレコードは、ここでKStreamに送信されます.
これは、上述したKStreamの2列目に表示される情報が、なぜ出力されたtopicに表示される内容が、変化を含む完全な内容であるため、
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

次の展望はこの例の範囲を超えている.Kafka StremsはKTableとchanglogのKStreamを使用しており、changlog StreamとKTableの間の二元性を利用して、KStreamテーブルの最初から最後までの完全なchangelogを公開することができ、KTableの内容を再構築することができます.
1.7.Kafkaクラスタの停止
クイックエントリーが完了すると、次の手順でKafkaクラスタを閉じることができます.
1.実行する端末でCtrl+cを使用してKafkaBrokerを停止したり、プロセスを殺したりする.
2.他の端末では、Ctrl+Cを使用してZookeeperインスタンスを停止したり、プロセスを殺したりする
おめでとうございます.Kafka Streamsの最初のアプリケーションを実行し、単一ノードのkafkaクラスタにデータを保存しました.Yeah!
1.8.次はどこへ行こう
次のステップでは、次のことをお勧めします.
1、KafkaStreamsアーキテクチャを読み、その主な概念と設計原則を理解する.
2、KafkaStreams開発ガイドを深く読み、ここにはkafka StreamsのDSLなどの各種ドキュメントが含まれている.これらはKafka Streamsの最初のアプリケーションを作成するのに役立ちます.
Kafka Streamsを処理するには、次のようなことにも興味があるかもしれません.
1、kafka Connectツール、kakfaおよびその他のデータシステムではHadoopでデータを移行する必要があります.
2、Kafka Clientから自分のアプリケーションにデータを読み込み、書き込みます.