Kafka快速構築ガイド


1.Kafkaをダウンロード
wget tar zxvf kafka_2.11-0.10.0.0.tgz
cd kafka_2.11-0.10.0.0

2.サービス・エンドの起動
KafkaはZooKeeprを使用する必要があるので、まずZooKeeprサービス・エンドを起動する必要があります.個別のZooKeeperサービス・エンドがない場合は、Kafkaが持参したスクリプトを使用して、単一ノードZooKeeprインスタンスを迅速に起動できます.
# bin/zookeeper-server-start.sh config/zookeeper.properties

Kafkaサービス・エンド・インスタンスの起動
# bin/kafka-server-start.sh config/server.properties

3.Kafka Topicを作成する
testという名前のtopicを作成します.このtopicには1つのパーティションと1つのコピーしかありません.
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test

作成後に表示されます
# bin/kafka-topics.sh --list  --zookeeper localhost:2181

また、手動でtopicsを作成する以外は、Brokersが自動的にtopicsを作成するように構成できます.
4.メッセージを送信
Kafkaにはコマンドラインツールkafka-console-producerがあります.shは、1つのファイルまたは標準入力からデータを読み込み、Kafkaクラスタにメッセージで送信することができる.デフォルトでは、各行はメッセージとして個別に送信されます.
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2
This is a message
This is another message

5.消費者を起動する
Kafkaはまた、Kafkaクラスタのメッセージを消費するためのコマンドラインツールを備えている.
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning

Producerとconsumerはそれぞれ2つのLinu端末を開いて、producer端はいくつかの内容を入力して、consumer端はproducer入力の内容をリアルタイムで表示することができます
6.multi-brokerクラスタの構築
以上,単一Kafka brokerを実行することを確立したが,Kafkaの場合,単一brokerはKafkaクラスタの1つのメンバにすぎず,次にKafkaクラスタを3つのbrokerインスタンスに拡張する.
#cp config/server.properties config/server1.properties
#cp config/server.properties config/server2.properties

主にいくつかのパラメータを変更
broker.id
listeners
log.dirs
broker.idはクラスタ内のKafka brokerインスタンスの唯一の属であり、各インスタンスは異なる必要がある.
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &

3つのレプリケーション・メンバーを持つtopicを作成します.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

クラスタ内の各brokerのステータスの表示
# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

まず、すべてのパーティションのサマリー情報をリストし、1つのパーティションに1行を表示します.
リーダーは、指定されたパーティションのすべての読み書き操作を担当し、各クラスタノードは、すべてのパーティションでランダムに選択されたパーティションのリーダーになります.
Replicasは、現在のパーティションのレプリケーションノードをリストします.これらのノードがLeaderであるかどうか、または現在生存しているかどうかにかかわらず.
Isr in-syncレプリケーションセットのサブセット.現在生存し、leaderに追いつくクラスタノードをリストします.
新しいtopicにメッセージを送信
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

メッセージを入力
もう一つの端末でこれらのメッセージを消費します
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

入力されたメッセージはすぐに消費しないでください.
構築したKafka Clusterのフォールトトレランス能力をテストします
# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,1,2

Leader:0はBroker 0がleader、Broker 1とBroker 2がreplica
Broker 0をkillに落とす
# kill -9 $(ps -ef|grep server.properties|grep -v grep|awk '{print $2}')

クラスタノードのステータスの再表示
# bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 1,2

リーダーがBroker 2に切り替えられていることがわかります.Broker 0はもうIsrというサブセットにはありません.Replicasはまだ3人のメンバーです.
消費者の再実行
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

以前の生産者から送信されたメッセージは依然として消費されていることがわかりますが、メッセージの書き込みを担当するリーダーはすでに切られています.
7.Kfaka Connectでエクスポートデータをインポートする
Linux端末からデータを読み込み、端末にデータを出力してKafkaを理解しやすくし、次にKafkaを使用して他のデータソースからデータをインポートし、他のシステムにデータをエクスポートします.多くのシステムでは、カスタムコードを書き直す代わりに、Kafka Connectを使用してエクスポートデータをインポートできます.Kafka Connectは、エクスポートデータをインポートするためのKafka独自のツールです.
まず1つのファイルにデータを書き込む
echo -e "foo
bar" > test.txt

次に、2つのKafkaコネクタを起動してstandaloneモードで実行します.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Kafka Connectプロセスを開始すると、source connectorはtestから起動します.txtでデータを読み込む、connect-testというtopicにプッシュすると、sink connectorはこのtopicからデータを読み出してtestに書き込む.sink.txtファイル
# cat test.sink.txt 
foo
bar

メッセージはconnect-testというtopicに格納されます
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning 
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

コネクタの端末は切らずにtestに進みます.txtファイルに数行追加し、console-consumerがリフレッシュされるかどうかを確認します
# echo "Another line" >> test.txt
# cat test.sink.txt 
foo
bar
Another line

見えますsink.txtこのファイルも1行増えました
8.Use Kafka Streams to process data
Kafka Streamsは、Kafkaがリアルタイムストリームデータ処理およびKafka brokersに格納されているデータの分析に使用するクライアント・ライブラリです.
#echo -e "all streams lead to kafka
hello kafka streams
join kafka summit" > file-input.txt
# cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
# ./bin/kafka-run-class.sh  org.apache.kafka.streams.examples.wordcount.WordCountDemo
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

しゅつりょく
all	1
streams	1
lead	1
to	1
kafka	1
hello	1
kafka	2
streams	2
join	1
kafka	3
summit	1

参照ドキュメント:
http://kafka.apache.org/documentation.html#quickstart