カフカの疑惑について
2401 ワード
Kafka Confusion
1.kafka
(1)consumerとpartitionは一対多の関係
consumerグループの1つのconsumerは複数のpartitionを消費することができるが、1つのpartitionは1つのconsumerによって消費されるしかない.kafkaの設計は1つのpartitionで同時化することが許されないからだ.消費の同時性を高めるためにはpartitionの数を増やさなければならない.
consumerがpartitionより多い場合、一部のconsumerはpartitionに消費できないことがあります.したがってpartitionの数をconsumerの数以上にします.合理的な導入案は、最初はpartitionの数をできるだけ多く(通常は2の整数乗)割り当て、その後は拡張のためにconsumerの数を直接増やすことです.
partition数がconsumer数より多い場合、kafkaはできるだけconsumer消費当たりのpartition数を均衡させ、consumer負荷の均衡の目的を達成する.以下の言葉(kafka公式ドキュメントからhttp://kafka.apache.org/documentation.html):
The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
次のリンクで実験を行いました.
http://www.jasongj.com/2015/08/09/KafkaColumn4/
2.saram
github.com/shopify/saramaはgo言語で実装されたkafka clientライブラリである.
(1)SyncProducerはAsyncProducerに対するパッケージである
SyncProducerはメッセージを送信するたびに、結果を返すのを待ってから次のメッセージを送信します.したがって、SyncProducerでは一括送信はサポートされていません.
(2)AsyncProducerのInput()とSuccesses()のブロック問題
AsyncProducerの2つの方法:Input()はメッセージを書き込むためのchannelを返し、Successes()は送信に成功したメッセージを収集するためのchannel(Errors()は送信に失敗したメッセージを収集するために使用される)を返す.アプリケーションは、1つのgoroutineでメッセージを絶えずInput()に書き込み、もう1つのgoroutineでSuccesses()とErrors()から送信結果を読み出して非同期送信を実現することができる.この2つの操作はfor selectにも書くことができます.
for {
select {
case producer.Input() <- message:
// do something
case message = <- producer.Successes():
// do something
case err := <- producer.Errors()
// do something
}
}
saramaだけConfig.Producer.Return.Successesがtrueに設定されている場合、producer.Successes()で読み込みます.また、このパラメータがtrueである場合、producerを読み出す必要がある.Successes()、そうでなければproducer.successes channelがいっぱいになり、producer.input channelもいっぱいでproducer.と書きますInput()の時は詰まっていました.
3.consumer group
github.com/wvanbergen/kafka/consumergroupはsaramaに基づいて開発されたconsumer groupの拡張である.saramaは現在consumerグループ機能をサポートしていません.
(1)consumergroupパッケージはoffsetをkafkaではなくzookeeperにコミットする.初期化(join group)時にzookeeperアドレスが必要です.
(2)消費開始点の配置
:
Offsets.Initial : , sarama.OffsetOldest sarama.OffsetNewest。
Offsets.ResetOffsets : bool 。 ,true ,false 。
pdos-server Offsets.Initial=sarama.OffsetNewest,Offsets.ResetOffsets=true。