Kafka快速構築ガイド
1.Kafkaをダウンロード
2.サービス・エンドの起動
KafkaはZooKeeprを使用する必要があるので、まずZooKeeprサービス・エンドを起動する必要があります.個別のZooKeeperサービス・エンドがない場合は、Kafkaが持参したスクリプトを使用して、単一ノードZooKeeprインスタンスを迅速に起動できます.
Kafkaサービス・エンド・インスタンスの起動
3.Kafka Topicを作成する
testという名前のtopicを作成します.このtopicには1つのパーティションと1つのコピーしかありません.
作成後に表示されます
また、手動でtopicsを作成する以外は、Brokersが自動的にtopicsを作成するように構成できます.
4.メッセージを送信
Kafkaにはコマンドラインツールkafka-console-producerがあります.shは、1つのファイルまたは標準入力からデータを読み込み、Kafkaクラスタにメッセージで送信することができる.デフォルトでは、各行はメッセージとして個別に送信されます.
5.消費者を起動する
Kafkaはまた、Kafkaクラスタのメッセージを消費するためのコマンドラインツールを備えている.
Producerとconsumerはそれぞれ2つのLinu端末を開いて、producer端はいくつかの内容を入力して、consumer端はproducer入力の内容をリアルタイムで表示することができます
6.multi-brokerクラスタの構築
以上,単一Kafka brokerを実行することを確立したが,Kafkaの場合,単一brokerはKafkaクラスタの1つのメンバにすぎず,次にKafkaクラスタを3つのbrokerインスタンスに拡張する.
主にいくつかのパラメータを変更
broker.id
listeners
log.dirs
broker.idはクラスタ内のKafka brokerインスタンスの唯一の属であり、各インスタンスは異なる必要がある.
3つのレプリケーション・メンバーを持つtopicを作成します.
クラスタ内の各brokerのステータスの表示
まず、すべてのパーティションのサマリー情報をリストし、1つのパーティションに1行を表示します.
リーダーは、指定されたパーティションのすべての読み書き操作を担当し、各クラスタノードは、すべてのパーティションでランダムに選択されたパーティションのリーダーになります.
Replicasは、現在のパーティションのレプリケーションノードをリストします.これらのノードがLeaderであるかどうか、または現在生存しているかどうかにかかわらず.
Isr in-syncレプリケーションセットのサブセット.現在生存し、leaderに追いつくクラスタノードをリストします.
新しいtopicにメッセージを送信
メッセージを入力
もう一つの端末でこれらのメッセージを消費します
入力されたメッセージはすぐに消費しないでください.
構築したKafka Clusterのフォールトトレランス能力をテストします
Leader:0はBroker 0がleader、Broker 1とBroker 2がreplica
Broker 0をkillに落とす
クラスタノードのステータスの再表示
リーダーがBroker 2に切り替えられていることがわかります.Broker 0はもうIsrというサブセットにはありません.Replicasはまだ3人のメンバーです.
消費者の再実行
以前の生産者から送信されたメッセージは依然として消費されていることがわかりますが、メッセージの書き込みを担当するリーダーはすでに切られています.
7.Kfaka Connectでエクスポートデータをインポートする
Linux端末からデータを読み込み、端末にデータを出力してKafkaを理解しやすくし、次にKafkaを使用して他のデータソースからデータをインポートし、他のシステムにデータをエクスポートします.多くのシステムでは、カスタムコードを書き直す代わりに、Kafka Connectを使用してエクスポートデータをインポートできます.Kafka Connectは、エクスポートデータをインポートするためのKafka独自のツールです.
まず1つのファイルにデータを書き込む
次に、2つのKafkaコネクタを起動してstandaloneモードで実行します.
connect-standalone.properties
connect-file-source.properties
connect-file-sink.properties
Kafka Connectプロセスを開始すると、source connectorはtestから起動します.txtでデータを読み込む、connect-testというtopicにプッシュすると、sink connectorはこのtopicからデータを読み出してtestに書き込む.sink.txtファイル
メッセージはconnect-testというtopicに格納されます
コネクタの端末は切らずにtestに進みます.txtファイルに数行追加し、console-consumerがリフレッシュされるかどうかを確認します
見えますsink.txtこのファイルも1行増えました
8.Use Kafka Streams to process data
Kafka Streamsは、Kafkaがリアルタイムストリームデータ処理およびKafka brokersに格納されているデータの分析に使用するクライアント・ライブラリです.
しゅつりょく
参照ドキュメント:
http://kafka.apache.org/documentation.html#quickstart
wget tar zxvf kafka_2.11-0.10.0.0.tgz
cd kafka_2.11-0.10.0.0
2.サービス・エンドの起動
KafkaはZooKeeprを使用する必要があるので、まずZooKeeprサービス・エンドを起動する必要があります.個別のZooKeeperサービス・エンドがない場合は、Kafkaが持参したスクリプトを使用して、単一ノードZooKeeprインスタンスを迅速に起動できます.
# bin/zookeeper-server-start.sh config/zookeeper.properties
Kafkaサービス・エンド・インスタンスの起動
# bin/kafka-server-start.sh config/server.properties
3.Kafka Topicを作成する
testという名前のtopicを作成します.このtopicには1つのパーティションと1つのコピーしかありません.
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
作成後に表示されます
# bin/kafka-topics.sh --list --zookeeper localhost:2181
また、手動でtopicsを作成する以外は、Brokersが自動的にtopicsを作成するように構成できます.
4.メッセージを送信
Kafkaにはコマンドラインツールkafka-console-producerがあります.shは、1つのファイルまたは標準入力からデータを読み込み、Kafkaクラスタにメッセージで送信することができる.デフォルトでは、各行はメッセージとして個別に送信されます.
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2
This is a message
This is another message
5.消費者を起動する
Kafkaはまた、Kafkaクラスタのメッセージを消費するためのコマンドラインツールを備えている.
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning
Producerとconsumerはそれぞれ2つのLinu端末を開いて、producer端はいくつかの内容を入力して、consumer端はproducer入力の内容をリアルタイムで表示することができます
6.multi-brokerクラスタの構築
以上,単一Kafka brokerを実行することを確立したが,Kafkaの場合,単一brokerはKafkaクラスタの1つのメンバにすぎず,次にKafkaクラスタを3つのbrokerインスタンスに拡張する.
#cp config/server.properties config/server1.properties
#cp config/server.properties config/server2.properties
主にいくつかのパラメータを変更
broker.id
listeners
log.dirs
broker.idはクラスタ内のKafka brokerインスタンスの唯一の属であり、各インスタンスは異なる必要がある.
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
3つのレプリケーション・メンバーを持つtopicを作成します.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
クラスタ内の各brokerのステータスの表示
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
まず、すべてのパーティションのサマリー情報をリストし、1つのパーティションに1行を表示します.
リーダーは、指定されたパーティションのすべての読み書き操作を担当し、各クラスタノードは、すべてのパーティションでランダムに選択されたパーティションのリーダーになります.
Replicasは、現在のパーティションのレプリケーションノードをリストします.これらのノードがLeaderであるかどうか、または現在生存しているかどうかにかかわらず.
Isr in-syncレプリケーションセットのサブセット.現在生存し、leaderに追いつくクラスタノードをリストします.
新しいtopicにメッセージを送信
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
メッセージを入力
もう一つの端末でこれらのメッセージを消費します
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
入力されたメッセージはすぐに消費しないでください.
構築したKafka Clusterのフォールトトレランス能力をテストします
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Leader:0はBroker 0がleader、Broker 1とBroker 2がreplica
Broker 0をkillに落とす
# kill -9 $(ps -ef|grep server.properties|grep -v grep|awk '{print $2}')
クラスタノードのステータスの再表示
# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 1,2
リーダーがBroker 2に切り替えられていることがわかります.Broker 0はもうIsrというサブセットにはありません.Replicasはまだ3人のメンバーです.
消費者の再実行
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
以前の生産者から送信されたメッセージは依然として消費されていることがわかりますが、メッセージの書き込みを担当するリーダーはすでに切られています.
7.Kfaka Connectでエクスポートデータをインポートする
Linux端末からデータを読み込み、端末にデータを出力してKafkaを理解しやすくし、次にKafkaを使用して他のデータソースからデータをインポートし、他のシステムにデータをエクスポートします.多くのシステムでは、カスタムコードを書き直す代わりに、Kafka Connectを使用してエクスポートデータをインポートできます.Kafka Connectは、エクスポートデータをインポートするためのKafka独自のツールです.
まず1つのファイルにデータを書き込む
echo -e "foo
bar" > test.txt
次に、2つのKafkaコネクタを起動してstandaloneモードで実行します.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Kafka Connectプロセスを開始すると、source connectorはtestから起動します.txtでデータを読み込む、connect-testというtopicにプッシュすると、sink connectorはこのtopicからデータを読み出してtestに書き込む.sink.txtファイル
# cat test.sink.txt
foo
bar
メッセージはconnect-testというtopicに格納されます
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
コネクタの端末は切らずにtestに進みます.txtファイルに数行追加し、console-consumerがリフレッシュされるかどうかを確認します
# echo "Another line" >> test.txt
# cat test.sink.txt
foo
bar
Another line
見えますsink.txtこのファイルも1行増えました
8.Use Kafka Streams to process data
Kafka Streamsは、Kafkaがリアルタイムストリームデータ処理およびKafka brokersに格納されているデータの分析に使用するクライアント・ライブラリです.
#echo -e "all streams lead to kafka
hello kafka streams
join kafka summit" > file-input.txt
# cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
# ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
しゅつりょく
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
参照ドキュメント:
http://kafka.apache.org/documentation.html#quickstart