kafkaのインストールと簡単な使用
6872 ワード
概要
本文は主にkafkaのインストールと簡単な使用を紹介して、その中zookeeprはkafkaが持参したので、本文は基本的に公式サイトのドキュメントに従ってインストールして起動して、その前にscalaシステム環境をインストールする必要があります:centos 7
1、ダウンロードは直接公式サイトでscalaに対応するバージョン(ソースファイルにダウンロードしないでください)をダウンロードすることができて、みんなは自分の実情によって対応するバージョンを選ぶことができます.次のコマンドを実行すると、ローカルにダウンロードできます.
2、指定したディレクトリに解凍する
3、起動サービス3.1 zookeeper kafkaを起動してzookeeperを使用するため、マシンにzookeeperサービスがない場合は、まずzookpeerサービスを起動する必要があります.ここではkafkaが持っているzookeeperを使用します.
起動するといくつかのログがあります.次は最後に出したログです.
zookeeperサービスのポートは2181であることがわかります.
3.2 kafkaサービスを起動して第2の端末を開く
ログ情報の一部
4、一つのテーマを作成して三つ目の端末を開く
リストtopicコマンドですべてのトピックを表示できます
または、エージェントを手動でトピックを作成するのではなく、存在しないトピックをパブリッシュするときに自動的にトピックを作成するように構成することもできます.
5.送信メッセージKafkaは、ファイルまたは標準入力から入力を取得し、メッセージとしてKafkaクラスタに送信するコマンドラインクライアントを有する.デフォルトでは、各行は個別のメッセージとして送信されます.生産者の起動
サーバに送信するメッセージをいくつか入力
6、消費者を起動し、消費者はメッセージを標準出力に転送して第4の端末を開くことができる
生産者からのメッセージをコマンドラインで見ることができます
8、複数のbrokerのクラスタを設定して今まで、私たちは単一のbrokerを設定していました.これはよくありません.次はこの機械に3つのノードを設定します.
8.1 brokerごとにプロファイルを作成する
次にvimでconfig/server-1を変更する.properties:
config/server-2.properties:
(listenersは前の注釈である#を取り除く必要がある)broker.idはクラスタ内の各ノードの一意で永続的な名前です.私たちは実際に同じマシンでこれらのファイルを実行しているので、ポートの競合とデータの上書きを避けるために、ポートとログディレクトリを書き直さなければなりません.
8.2新しいノードを起動1つのノードを起動しました(broker.id=0)、2つの新しいノードを起動します
8.3 3コピーの新規トピックを作成する
Describe topicsを実行してトピックの情報を表示
これらの出力について説明します.最初の行はすべてのパーティションの要約であり、別の行は1つのパーティションの情報であり、このトピックには1つのパーティションしかないので、1行しかありません.リーダー:すべての読み取りと書き込みを担当し、このパーティションはすべてのノードからランダムに選択されます.Replicas:このパーティションのログをコピーするノードのリストです.リーダーであっても、彼らが今も生きていても.isr:同期レプリカの集合であり、まだ生きているレプリカの自分であり、leaderに捕獲される(caught-up).
私の例では、ノード0は、このトピックの一意のパーティションのleaderです.以前に作成したトピックtestを同じコマンドで表示できます.
8.4メッセージの送信
my test message 1 my test message 2
8、5消費情報
8.6テストフォールトトレランスと発見した問題私のリーダーはノード0で、今killを落とします
対応するプロセス番号を取得
そしてkillは1つ目を落とします(2つ目はgrepコマンド自体です)
killが落ちたらtopicについて説明します
これでリーダーはノード1に切り替わり、Isrもノード1と2しかありません
公式サイトでは、元のリーダーが失敗しても、これらのメッセージは消費に使用できますが、次のピットがあります.まず、次の文を実行します.
誤報を発見する
本文は主にkafkaのインストールと簡単な使用を紹介して、その中zookeeprはkafkaが持参したので、本文は基本的に公式サイトのドキュメントに従ってインストールして起動して、その前にscalaシステム環境をインストールする必要があります:centos 7
1、ダウンロードは直接公式サイトでscalaに対応するバージョン(ソースファイルにダウンロードしないでください)をダウンロードすることができて、みんなは自分の実情によって対応するバージョンを選ぶことができます.次のコマンドを実行すると、ローカルにダウンロードできます.
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
2、指定したディレクトリに解凍する
tar -zxvf kafka_2.12-2.3.0.tgz -C /opt/
3、起動サービス3.1 zookeeper kafkaを起動してzookeeperを使用するため、マシンにzookeeperサービスがない場合は、まずzookpeerサービスを起動する必要があります.ここではkafkaが持っているzookeeperを使用します.
cd /opt/kafka_2.12-2.3.0/
bin/zookeeper-server-start.sh config/zookeeper.properties
起動するといくつかのログがあります.次は最後に出したログです.
[2019-08-23 17:12:32,813] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2019-08-23 17:12:41,375] INFO Expiring session 0x100063d407b0000, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2019-08-23 17:12:41,382] INFO Processed session termination for sessionid: 0x100063d407b0000 (org.apache.zookeeper.server.PrepRequestProcessor)
[2019-08-23 17:12:41,383] INFO Creating new log file: log.170 (org.apache.zookeeper.server.persistence.FileTxnLog)
zookeeperサービスのポートは2181であることがわかります.
3.2 kafkaサービスを起動して第2の端末を開く
bin/kafka-server-start.sh config/server.properties
ログ情報の一部
[2019-08-23 17:14:57,616] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-39 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-08-23 17:14:57,617] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-42 in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-08-23 17:14:57,617] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-45 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-08-23 17:14:57,682] INFO [GroupCoordinator 0]: Loading group metadata for test with generation 2 (kafka.coordinator.group.GroupCoordinator)
[2019-08-23 17:14:57,683] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-48 in 66 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
4、一つのテーマを作成して三つ目の端末を開く
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
リストtopicコマンドですべてのトピックを表示できます
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
または、エージェントを手動でトピックを作成するのではなく、存在しないトピックをパブリッシュするときに自動的にトピックを作成するように構成することもできます.
5.送信メッセージKafkaは、ファイルまたは標準入力から入力を取得し、メッセージとしてKafkaクラスタに送信するコマンドラインクライアントを有する.デフォルトでは、各行は個別のメッセージとして送信されます.生産者の起動
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
サーバに送信するメッセージをいくつか入力
>hello
>kafka
6、消費者を起動し、消費者はメッセージを標準出力に転送して第4の端末を開くことができる
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
生産者からのメッセージをコマンドラインで見ることができます
hello
kafka
8、複数のbrokerのクラスタを設定して今まで、私たちは単一のbrokerを設定していました.これはよくありません.次はこの機械に3つのノードを設定します.
8.1 brokerごとにプロファイルを作成する
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
次にvimでconfig/server-1を変更する.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
(listenersは前の注釈である#を取り除く必要がある)broker.idはクラスタ内の各ノードの一意で永続的な名前です.私たちは実際に同じマシンでこれらのファイルを実行しているので、ポートの競合とデータの上書きを避けるために、ポートとログディレクトリを書き直さなければなりません.
8.2新しいノードを起動1つのノードを起動しました(broker.id=0)、2つの新しいノードを起動します
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
8.3 3コピーの新規トピックを作成する
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Describe topicsを実行してトピックの情報を表示
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
これらの出力について説明します.最初の行はすべてのパーティションの要約であり、別の行は1つのパーティションの情報であり、このトピックには1つのパーティションしかないので、1行しかありません.リーダー:すべての読み取りと書き込みを担当し、このパーティションはすべてのノードからランダムに選択されます.Replicas:このパーティションのログをコピーするノードのリストです.リーダーであっても、彼らが今も生きていても.isr:同期レプリカの集合であり、まだ生きているレプリカの自分であり、leaderに捕獲される(caught-up).
私の例では、ノード0は、このトピックの一意のパーティションのleaderです.以前に作成したトピックtestを同じコマンドで表示できます.
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
8.4メッセージの送信
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
my test message 1 my test message 2
8、5消費情報
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
8.6テストフォールトトレランスと発見した問題私のリーダーはノード0で、今killを落とします
ps aux | grep server.properties
対応するプロセス番号を取得
root 9716 5.9 54.4 4276552 271584 pts/1 Sl+ 17:14 0:28 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/home/vagrant/kafka_2.11-
そしてkillは1つ目を落とします(2つ目はgrepコマンド自体です)
kill -9 4276552
killが落ちたらtopicについて説明します
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
これでリーダーはノード1に切り替わり、Isrもノード1と2しかありません
公式サイトでは、元のリーダーが失敗しても、これらのメッセージは消費に使用できますが、次のピットがあります.まず、次の文を実行します.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
誤報を発見する
[2018-05-22 03:55:13,304] WARN [Consumer clientId=consumer-1, groupId=console-consumer-29320] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)