新しいバージョンのZookeeperを使って、kafkaクラスタを構築します.

18452 ワード

インストールの簡単な説明
新しいバージョンのkafkaはzookeeperを持参していますが、実際にはzookeeperは完全に使用できます.この記事は自分のzookeeperを使ってkafkaクラスタを構築することを記録します.
1、kafkaのダウンロードについて
kafkaダウンロードページ:http://kafka.apache.org/downloads
2、kafkaのプロファイルを修正する
kafkaはカタログの下のconfigフォルダをインストールして、その配置ファイルのために、私達が修正したいのはserver.propertiesとzookeeper.propertiesです.
2.1、ログディレクトリの変更
最初にkafkaのログディレクトリとzookeeperデータディレクトリを修正します.この二つのデフォルトはtmpディレクトリに置いています.tmpディレクトリの内容は再起動とともに失われます.
server.properties:
    log.dirs=/opt/kafka-logs
    に変更
    log.dirs=/opt/kafka/logs
zookeeper.properties
    dataDir=/opt/zookeeper
    に変更
    dataDir=/opt/zookeeper/data
2.2、kafkaの配置
kafkaクラスタは、leaderを推選するのに便利なように、奇数台のサービスコンポーネントによって一般的にクラスタ化されており、ここでは3台を例にして、それぞれxxx.xx.xx.x.x.x.x.x.X 01、xxxx.xx.xx.xx.xx.03にサービスのipを表している.
(サーバIPはifconfigコマンドで閲覧可能)
server.propertiesを変更:
1、brook er.idを設置して、各brook erが唯一であることを保証します.1台目はデフォルトを0に変更しなくてもいいです.後ろの2台は修正が必要です.例えば1と2台に変更します.
2、num.partitionsを設置して、パーティション数は普通brookと一致しています.
3、advertised.listenersとlistenersを設定し、例えばlisteners=PLANTEXT://xxx.xxxx.xxxx.xx.9092
4、zookeeper.co nnectを設置し、3台のサービスzookeeper接続住所を配置し、例えばzookeeper.co nnect=xx.xxxx.xx.xx.xx.x.x.x.x.x.x.x.x.2:2181、xx.xxx.xx.xx 03:2181
zookeeper.propertiesを修正:
1、接続パラメータを設定し、以下の構成を追加します.
        tickTime=2000
        init Limit=10
        syncLimit=5
 
2、brooker Idのサービスアドレスを設定する
        server.0=xxx.xx.xx.x 01:2888:3888        server.1=xxx.xxx.xx.x 02:2888:3888        server.2=xxx.xx.xx.xx 03:2888:3888
zookeeperデータディレクトリにidの配置を追加します.
各台のサービスのzookeeperデータディレクトリにmyidファイルを追加し、サービスbrook er.id属性値を書き込みます.ここのカタログは/opt/zookeeper/dataです.
最初の台のbrook er.idは0のサービスです.このディレクトリの下で実行します.echo 0>myid
3、kafkaを起動する
kafka起動時にまずzookeeperを起動し、kafkaを起動する.闭じる时は逆に、とりあえずkafkaを闭じて、zookeeperを闭じます.
起動zookeeper:
        bin/zookeeper-server-start.sh config/zookeeper.properties&
起動kafka:
        bin/kafka-server-start.sh config/server.properties&
4、kafkaクラスタをテストする
4.1、最初のサービスでtestテーマを作成する
bin/kafka-topics.sh--create--topic test--zookeeper xx.xxx.xx.x 01:2181--replication-factor 3--partion 3
4.2、展示テーマ、テーマの作成成功を確認する
bin/kafka-topics.sh--list--zookeeperxx.xxx.xx.x.x 01:2181
4.3、生産者の創建
bin/kafka-consolie-producer.sh--brooker-listxx.xxx.xxx.xA:9092--topic test
4.4、消費者を創建し、他の2台のサービスでそれぞれ消費者を創建する.
bin/kafka-consolie-consumer.sh--zookeeper xx.xxx.xxx.x 02:2121--topic test--from-beginning
bin/kafka-consolie-consumer.sh--zookeeper xxx.xxx.xx.xx 03:2181--topic test--from-beginning
4.5、テストメッセージの発表と消費
IPがxxxx.xxx.x.x.x 01のサービス生産中にメッセージを入力して返送し、サービス02と03の中の消費者が受信したかどうか確認する.
5、その他
設定が完了したらポートや他の設定を変更する必要がありますが、エラーが発生しました.kafka(/opt/kafka/logs)とzookeeper(/opt/zookeeper/data)のバッファディレクトリの内容を確認して再起動することができます.
 
BROKER  
broker.id、log.dir、zookeeper.connect
## broker , 。 IP , broker.id consumers broker.id =1 ##kafka , /tmp/kafka-logs-1,/tmp/kafka-logs-2 log.dirs = /tmp/kafka-logs ## port =6667 ## , message.max.bytes =1000000 ## broker , num.network.threads =3 ## broker IO , num.io.threads =8 ## , , background.threads =4 ## IO , IO , , queued.max.requests =500 ##broker , , , , , ZK, host.name ## , , producers, consumers, broker , advertised.host.name ## , port advertised.port ## socket ,socket SO_SNDBUFF socket.send.buffer.bytes =100*1024 ## socket ,socket SO_RCVBUFF socket.receive.buffer.bytes =100*1024 ## socket , serverOOM,message.max.bytes socket.request.max.bytes, topic socket.request.max.bytes =100*1024*1024 ------------------------------------------- LOG ------------------------------------------- ## topic segment , segment , topic log.segment.bytes =1024*1024*1024 ## segment log.segment.bytes , segment topic log.roll.hours =24*7 ## :delete compact , , topic log.cleanup.policy = delete ## log.cleanup.policy , ## log.retention.bytes log.retention.minutes , , topic log.retention.minutes=7days , 1 log.cleanup.interval.mins=1 ## topic , topic = *log.retention.bytes 。-1 ## log.retention.bytes log.retention.minutes , , topic log.retention.bytes=-1 ## , log.cleanup.policy log.retention.check.interval.ms=5minutes ## log.cleaner.enable=false ## log.cleaner.threads =1 ## log.cleaner.io.max.bytes.per.second=None ## , , log.cleaner.dedupe.buffer.size=500*1024*1024 ## IO log.cleaner.io.buffer.size=512*1024 ## hash log.cleaner.io.buffer.load.factor =0.9 ## log.cleaner.backoff.ms =15000 ## , , , topic log.cleaner.min.cleanable.ratio=0.5 ## , , log.retention.minutes , 。 topic log.cleaner.delete.retention.ms =1day ## segment , topic log.index.size.max.bytes =10*1024*1024 ## fetch , offset , , , , log.index.interval.bytes =4096 ## log "sync" ## IO , " " ## , " " " " . ## , "fsync" (IO ) ## , "fsync" , client . ## server , fsync . log.flush.interval.messages=None ## log.flush.scheduler.interval.ms =3000 ## interval , . ## "fsync" , , ## , . log.flush.interval.ms = None ## log.delete.delay.ms =60000 ## , log.flush.offset.checkpoint.interval.ms =60000 ------------------------------------------- TOPIC ------------------------------------------- ## topic , false, topic auto.create.topics.enable =true ## topic , replication , broker default.replication.factor =1 ## topic , topic topic num.partitions =1 --replication-factor3--partitions1--topic replicated-topic : replicated-topic , broker 。 ---------------------------------- (Leader、replicas) ---------------------------------- ## partition leader replicas ,socket controller.socket.timeout.ms =30000 ## partition leader replicas , controller.message.queue.size=10 ## replicas partition leader , , replicas ISR(in-sync replicas), , replica.lag.time.max.ms =10000 ## follower leader , follower[ partition relicas] ## , follower leader , , replicas ## ,leader follower , replicas ## follower . ## broker , , . replica.lag.max.messages =4000 ##follower leader socket replica.socket.timeout.ms=30*1000 ## leader socket replica.socket.receive.buffer.bytes=64*1024 ## replicas replica.fetch.max.bytes =1024*1024 ## replicas leader , replica.fetch.wait.max.ms =500 ## fetch , leader , , replica.fetch.min.bytes =1 ## leader , follower IO num.replica.fetchers=1 ## replica replica.high.watermark.checkpoint.interval.ms =5000 ## broker , true, broker leader, broker controlled.shutdown.enable =false ## controlled.shutdown.max.retries =3 ## controlled.shutdown.retry.backoff.ms =5000 ## broker auto.leader.rebalance.enable =false ## leader , , leader.imbalance.per.broker.percentage =10 ## leader leader.imbalance.check.interval.seconds =300 ## offset offset.metadata.max.bytes ----------------------------------ZooKeeper ---------------------------------- ##zookeeper , , hostname1:port1,hostname2:port2,hostname3:port3 zookeeper.connect = localhost:2181 ## ZooKeeper , , , , zookeeper.session.timeout.ms=6000 ## ZooKeeper zookeeper.connection.timeout.ms =6000 ## ZooKeeper leader follower zookeeper.sync.time.ms =2000 topic , bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000 : bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes

 
 
Consumer  

group.id、zookeeper.connect ## Consumer ID,broker group.id , group.id ## ID, , consumer.id ## ID , group.id client.id
= group id value ## zookeeper , hostname1:port1,hostname2:port2,hostname3:port3 broker zk zookeeper.connect=localhost:2182 ## zookeeper , dead zookeeper.session.timeout.ms =6000 ## zookeeper zookeeper.connection.timeout.ms =6000 ## zookeeper follower leader zookeeper.sync.time.ms =2000 ## zookeeper offset 。smallest : largest: anythingelse: auto.offset.reset = largest ## socket , :max.fetch.wait + socket.timeout.ms. socket.timeout.ms=30*1000 ## socket socket.receive.buffer.bytes=64*1024 ## fetch.message.max.bytes =1024*1024 ## offset zookeeper, Consumer zookeeper offset auto.commit.enable =true ## auto.commit.interval.ms =60*1000 ## , fetch.message.max.bytes queued.max.message.chunks =10 ## consumer group , reblance, partitions ## consumer , consumer partition , zk ##"Partition Owner registry" , consumer , ## , . rebalance.max.retries =4 ## rebalance.backoff.ms =2000 ## leader refresh.leader.backoff.ms ## server , , fetch.min.bytes =1 ## (fetch.min.bytes) , fetch.wait.max.ms =100 ## , consumer.timeout.ms = -1
 
転載先:https://www.cnblogs.com/fameg/p/9827169.html