新しいバージョンのZookeeperを使って、kafkaクラスタを構築します.
18452 ワード
インストールの簡単な説明
新しいバージョンのkafkaはzookeeperを持参していますが、実際にはzookeeperは完全に使用できます.この記事は自分のzookeeperを使ってkafkaクラスタを構築することを記録します.
1、kafkaのダウンロードについて
kafkaダウンロードページ:http://kafka.apache.org/downloads
2、kafkaのプロファイルを修正する
kafkaはカタログの下のconfigフォルダをインストールして、その配置ファイルのために、私達が修正したいのはserver.propertiesとzookeeper.propertiesです.
2.1、ログディレクトリの変更
最初にkafkaのログディレクトリとzookeeperデータディレクトリを修正します.この二つのデフォルトはtmpディレクトリに置いています.tmpディレクトリの内容は再起動とともに失われます.
server.properties:
log.dirs=/opt/kafka-logs
に変更
log.dirs=/opt/kafka/logs
zookeeper.properties
dataDir=/opt/zookeeper
に変更
dataDir=/opt/zookeeper/data
2.2、kafkaの配置
kafkaクラスタは、leaderを推選するのに便利なように、奇数台のサービスコンポーネントによって一般的にクラスタ化されており、ここでは3台を例にして、それぞれxxx.xx.xx.x.x.x.x.x.X 01、xxxx.xx.xx.xx.xx.03にサービスのipを表している.
(サーバIPはifconfigコマンドで閲覧可能)
server.propertiesを変更:
1、brook er.idを設置して、各brook erが唯一であることを保証します.1台目はデフォルトを0に変更しなくてもいいです.後ろの2台は修正が必要です.例えば1と2台に変更します.
2、num.partitionsを設置して、パーティション数は普通brookと一致しています.
3、advertised.listenersとlistenersを設定し、例えばlisteners=PLANTEXT://xxx.xxxx.xxxx.xx.9092
4、zookeeper.co nnectを設置し、3台のサービスzookeeper接続住所を配置し、例えばzookeeper.co nnect=xx.xxxx.xx.xx.xx.x.x.x.x.x.x.x.x.2:2181、xx.xxx.xx.xx 03:2181
zookeeper.propertiesを修正:
1、接続パラメータを設定し、以下の構成を追加します.
tickTime=2000
init Limit=10
syncLimit=5
2、brooker Idのサービスアドレスを設定する
server.0=xxx.xx.xx.x 01:2888:3888 server.1=xxx.xxx.xx.x 02:2888:3888 server.2=xxx.xx.xx.xx 03:2888:3888
zookeeperデータディレクトリにidの配置を追加します.
各台のサービスのzookeeperデータディレクトリにmyidファイルを追加し、サービスbrook er.id属性値を書き込みます.ここのカタログは/opt/zookeeper/dataです.
最初の台のbrook er.idは0のサービスです.このディレクトリの下で実行します.echo 0>myid
3、kafkaを起動する
kafka起動時にまずzookeeperを起動し、kafkaを起動する.闭じる时は逆に、とりあえずkafkaを闭じて、zookeeperを闭じます.
起動zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties&
起動kafka:
bin/kafka-server-start.sh config/server.properties&
4、kafkaクラスタをテストする
4.1、最初のサービスでtestテーマを作成する
bin/kafka-topics.sh--create--topic test--zookeeper xx.xxx.xx.x 01:2181--replication-factor 3--partion 3
4.2、展示テーマ、テーマの作成成功を確認する
bin/kafka-topics.sh--list--zookeeperxx.xxx.xx.x.x 01:2181
4.3、生産者の創建
bin/kafka-consolie-producer.sh--brooker-listxx.xxx.xxx.xA:9092--topic test
4.4、消費者を創建し、他の2台のサービスでそれぞれ消費者を創建する.
bin/kafka-consolie-consumer.sh--zookeeper xx.xxx.xxx.x 02:2121--topic test--from-beginning
bin/kafka-consolie-consumer.sh--zookeeper xxx.xxx.xx.xx 03:2181--topic test--from-beginning
4.5、テストメッセージの発表と消費
IPがxxxx.xxx.x.x.x 01のサービス生産中にメッセージを入力して返送し、サービス02と03の中の消費者が受信したかどうか確認する.
5、その他
設定が完了したらポートや他の設定を変更する必要がありますが、エラーが発生しました.kafka(/opt/kafka/logs)とzookeeper(/opt/zookeeper/data)のバッファディレクトリの内容を確認して再起動することができます.
転載先:https://www.cnblogs.com/fameg/p/9827169.html
新しいバージョンのkafkaはzookeeperを持参していますが、実際にはzookeeperは完全に使用できます.この記事は自分のzookeeperを使ってkafkaクラスタを構築することを記録します.
1、kafkaのダウンロードについて
kafkaダウンロードページ:http://kafka.apache.org/downloads
2、kafkaのプロファイルを修正する
kafkaはカタログの下のconfigフォルダをインストールして、その配置ファイルのために、私達が修正したいのはserver.propertiesとzookeeper.propertiesです.
2.1、ログディレクトリの変更
最初にkafkaのログディレクトリとzookeeperデータディレクトリを修正します.この二つのデフォルトはtmpディレクトリに置いています.tmpディレクトリの内容は再起動とともに失われます.
server.properties:
log.dirs=/opt/kafka-logs
に変更
log.dirs=/opt/kafka/logs
zookeeper.properties
dataDir=/opt/zookeeper
に変更
dataDir=/opt/zookeeper/data
2.2、kafkaの配置
kafkaクラスタは、leaderを推選するのに便利なように、奇数台のサービスコンポーネントによって一般的にクラスタ化されており、ここでは3台を例にして、それぞれxxx.xx.xx.x.x.x.x.x.X 01、xxxx.xx.xx.xx.xx.03にサービスのipを表している.
(サーバIPはifconfigコマンドで閲覧可能)
server.propertiesを変更:
1、brook er.idを設置して、各brook erが唯一であることを保証します.1台目はデフォルトを0に変更しなくてもいいです.後ろの2台は修正が必要です.例えば1と2台に変更します.
2、num.partitionsを設置して、パーティション数は普通brookと一致しています.
3、advertised.listenersとlistenersを設定し、例えばlisteners=PLANTEXT://xxx.xxxx.xxxx.xx.9092
4、zookeeper.co nnectを設置し、3台のサービスzookeeper接続住所を配置し、例えばzookeeper.co nnect=xx.xxxx.xx.xx.xx.x.x.x.x.x.x.x.x.2:2181、xx.xxx.xx.xx 03:2181
zookeeper.propertiesを修正:
1、接続パラメータを設定し、以下の構成を追加します.
tickTime=2000
init Limit=10
syncLimit=5
2、brooker Idのサービスアドレスを設定する
server.0=xxx.xx.xx.x 01:2888:3888 server.1=xxx.xxx.xx.x 02:2888:3888 server.2=xxx.xx.xx.xx 03:2888:3888
zookeeperデータディレクトリにidの配置を追加します.
各台のサービスのzookeeperデータディレクトリにmyidファイルを追加し、サービスbrook er.id属性値を書き込みます.ここのカタログは/opt/zookeeper/dataです.
最初の台のbrook er.idは0のサービスです.このディレクトリの下で実行します.echo 0>myid
3、kafkaを起動する
kafka起動時にまずzookeeperを起動し、kafkaを起動する.闭じる时は逆に、とりあえずkafkaを闭じて、zookeeperを闭じます.
起動zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties&
起動kafka:
bin/kafka-server-start.sh config/server.properties&
4、kafkaクラスタをテストする
4.1、最初のサービスでtestテーマを作成する
bin/kafka-topics.sh--create--topic test--zookeeper xx.xxx.xx.x 01:2181--replication-factor 3--partion 3
4.2、展示テーマ、テーマの作成成功を確認する
bin/kafka-topics.sh--list--zookeeperxx.xxx.xx.x.x 01:2181
4.3、生産者の創建
bin/kafka-consolie-producer.sh--brooker-listxx.xxx.xxx.xA:9092--topic test
4.4、消費者を創建し、他の2台のサービスでそれぞれ消費者を創建する.
bin/kafka-consolie-consumer.sh--zookeeper xx.xxx.xxx.x 02:2121--topic test--from-beginning
bin/kafka-consolie-consumer.sh--zookeeper xxx.xxx.xx.xx 03:2181--topic test--from-beginning
4.5、テストメッセージの発表と消費
IPがxxxx.xxx.x.x.x 01のサービス生産中にメッセージを入力して返送し、サービス02と03の中の消費者が受信したかどうか確認する.
5、その他
設定が完了したらポートや他の設定を変更する必要がありますが、エラーが発生しました.kafka(/opt/kafka/logs)とzookeeper(/opt/zookeeper/data)のバッファディレクトリの内容を確認して再起動することができます.
BROKER
broker.id、log.dir、zookeeper.connect
## broker , 。 IP , broker.id consumers
broker.id =1
##kafka , /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
##
port =6667
## ,
message.max.bytes =1000000
## broker ,
num.network.threads =3
## broker IO ,
num.io.threads =8
## , ,
background.threads =4
## IO , IO , ,
queued.max.requests =500
##broker , , , , , ZK,
host.name
## , , producers, consumers, broker ,
advertised.host.name
## , port
advertised.port
## socket ,socket SO_SNDBUFF
socket.send.buffer.bytes =100*1024
## socket ,socket SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
## socket , serverOOM,message.max.bytes socket.request.max.bytes, topic
socket.request.max.bytes =100*1024*1024
------------------------------------------- LOG -------------------------------------------
## topic segment , segment , topic
log.segment.bytes =1024*1024*1024
## segment log.segment.bytes , segment topic
log.roll.hours =24*7
## :delete compact , , topic
log.cleanup.policy = delete
## log.cleanup.policy ,
## log.retention.bytes log.retention.minutes , , topic
log.retention.minutes=7days
, 1
log.cleanup.interval.mins=1
## topic , topic = *log.retention.bytes 。-1
## log.retention.bytes log.retention.minutes , , topic
log.retention.bytes=-1
## , log.cleanup.policy
log.retention.check.interval.ms=5minutes
##
log.cleaner.enable=false
##
log.cleaner.threads =1
##
log.cleaner.io.max.bytes.per.second=None
## , ,
log.cleaner.dedupe.buffer.size=500*1024*1024
## IO
log.cleaner.io.buffer.size=512*1024
## hash
log.cleaner.io.buffer.load.factor =0.9
##
log.cleaner.backoff.ms =15000
## , , , topic
log.cleaner.min.cleanable.ratio=0.5
## , , log.retention.minutes , 。 topic
log.cleaner.delete.retention.ms =1day
## segment , topic
log.index.size.max.bytes =10*1024*1024
## fetch , offset , , , ,
log.index.interval.bytes =4096
## log "sync"
## IO , " "
## , " " " " .
## , "fsync" (IO )
## , "fsync" , client .
## server , fsync .
log.flush.interval.messages=None
##
log.flush.scheduler.interval.ms =3000
## interval , .
## "fsync" , ,
## , .
log.flush.interval.ms = None
##
log.delete.delay.ms =60000
## ,
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC -------------------------------------------
## topic , false, topic
auto.create.topics.enable =true
## topic , replication , broker
default.replication.factor =1
## topic , topic topic
num.partitions =1
--replication-factor3--partitions1--topic replicated-topic : replicated-topic , broker 。
---------------------------------- (Leader、replicas) ----------------------------------
## partition leader replicas ,socket
controller.socket.timeout.ms =30000
## partition leader replicas ,
controller.message.queue.size=10
## replicas partition leader , , replicas ISR(in-sync replicas), ,
replica.lag.time.max.ms =10000
## follower leader , follower[ partition relicas]
## , follower leader , , replicas
## ,leader follower , replicas
## follower .
## broker , , .
replica.lag.max.messages =4000
##follower leader socket
replica.socket.timeout.ms=30*1000
## leader socket
replica.socket.receive.buffer.bytes=64*1024
## replicas
replica.fetch.max.bytes =1024*1024
## replicas leader ,
replica.fetch.wait.max.ms =500
## fetch , leader , ,
replica.fetch.min.bytes =1
## leader , follower IO
num.replica.fetchers=1
## replica
replica.high.watermark.checkpoint.interval.ms =5000
## broker , true, broker leader, broker
controlled.shutdown.enable =false
##
controlled.shutdown.max.retries =3
##
controlled.shutdown.retry.backoff.ms =5000
## broker
auto.leader.rebalance.enable =false
## leader , ,
leader.imbalance.per.broker.percentage =10
## leader
leader.imbalance.check.interval.seconds =300
## offset
offset.metadata.max.bytes
----------------------------------ZooKeeper ----------------------------------
##zookeeper , , hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
## ZooKeeper , , , ,
zookeeper.session.timeout.ms=6000
## ZooKeeper
zookeeper.connection.timeout.ms =6000
## ZooKeeper leader follower
zookeeper.sync.time.ms =2000
topic ,
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
:
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
Consumer
group.id、zookeeper.connect
## Consumer ID,broker group.id ,
group.id
## ID, ,
consumer.id
## ID , group.id
client.id = group id value
## zookeeper , hostname1:port1,hostname2:port2,hostname3:port3 broker zk
zookeeper.connect=localhost:2182
## zookeeper , dead
zookeeper.session.timeout.ms =6000
## zookeeper
zookeeper.connection.timeout.ms =6000
## zookeeper follower leader
zookeeper.sync.time.ms =2000
## zookeeper offset 。smallest : largest: anythingelse:
auto.offset.reset = largest
## socket , :max.fetch.wait + socket.timeout.ms.
socket.timeout.ms=30*1000
## socket
socket.receive.buffer.bytes=64*1024
##
fetch.message.max.bytes =1024*1024
## offset zookeeper, Consumer zookeeper offset
auto.commit.enable =true
##
auto.commit.interval.ms =60*1000
## , fetch.message.max.bytes
queued.max.message.chunks =10
## consumer group , reblance, partitions
## consumer , consumer partition , zk
##"Partition Owner registry" , consumer ,
## , .
rebalance.max.retries =4
##
rebalance.backoff.ms =2000
## leader
refresh.leader.backoff.ms
## server , ,
fetch.min.bytes =1
## (fetch.min.bytes) ,
fetch.wait.max.ms =100
## ,
consumer.timeout.ms = -1
転載先:https://www.cnblogs.com/fameg/p/9827169.html