kafkaローカルミラー書き込み成功編成サービス書き込み失敗報告ミスMessage contents does not match its CRC
8463 ワード
背景
再構築pubsubv 2の開発中にテストしたところ、ローカルkafkaミラーを接続して正常に書き込まれ、私たちのテストサーバに接続したdevopsでミラーが失敗したことがわかりました.エラーメッセージ:
kafka: Failed to produce message to topic Asys.i.bench.D_Asys.i.bench.D: Message contents does not match its CRC.
プロセスの追跡
問題の原因
ローカル書き込みに成功し、リモートテスター書き込みに成功しなかったのは、ミラー起動時のパラメータが異なるためです.私が編成したkafkaミラー起動環境変数は以下のように設定されています.
KAFKA_PORT='23310'
KAFKA_ADVERTISED_HOST_NAME='KSSCS40000H31904120007'
KAFKA_ZOOKEEPER_CONNECT='KSSCS40000H31904120007:23010'
KAFKA_BROKER_ID='1'
KAFKA_LOG_DIRS='/kafka'
KAFKA_AUTO_CREATE_TOPICS_ENABLE='false'
KAFKA_LOG_CLEANUP_POLICY='compact'
KAFKA_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
KAFKA_HEAP_OPTS='-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
JMX_PORT='23311'
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
その中で問題になったのがKAFKA_LOG_CLEANUP_POLICY=‘compact’という環境変数.kafkaのデフォルトの構成ではKAFKA_LOG_CLEANUP_POLICY=「delete」で、ローカルで起動するときにデフォルトの構成を使用します.送信コードフラグメントは次のように書かれています.
kafkaTopic := getKafkaTopicName(sub.TopicName, sub.Name)
for _, message := range messages {
if !publishFilter(sub, message) {
continue
}
value, err := proto.Marshal(message)
if err != nil {
continue
}
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
select {
case producer.Input() msg:
counterPublishMsgOfSubcription.With(prometheus.Labels{topicLabelName: sub.TopicName, subscriptionLabelName: sub.Name}).Inc()
case ctx.Done():
break
}
}
送信に失敗した理由は
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
のProducerMessageにはKeyというフィールドが記入されていません.私たちがメッセージを送信するのはkeyのhash形式ではなくランダムな形式を採用しているので、このフィールドは記入しなくてもいいと思います.一つは何を記入するか分からないことです.もう一つは使っていないと思います.ちょうどこの役に立たないのは踏んだ穴だ.KAFKA_LOG_CLEANUP_POLICY=‘compact’の場合、kafkaの削除ポリシーはkey保存で圧縮されます.そのため、書き込まれたメッセージにKeyというフィールドがないとエラーが報告されます(このエラー情報は原因とあまり一致していません.ネット上ではCRCエラーが必ずしも説明と一致していないという議論もあります).
解決策
問題の所在が分かったら、送信コードはそれに応じて変更されます.
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value)), Key: sarama.StringEncoder(message.Id)}
その後、送信に成功し、問題が解決しました.