kafkaローカルミラー書き込み成功編成サービス書き込み失敗報告ミスMessage contents does not match its CRC

8463 ワード

背景


再構築pubsubv 2の開発中にテストしたところ、ローカルkafkaミラーを接続して正常に書き込まれ、私たちのテストサーバに接続したdevopsでミラーが失敗したことがわかりました.エラーメッセージ:
kafka: Failed to produce message to topic Asys.i.bench.D_Asys.i.bench.D: Message contents does not match its CRC.

プロセスの追跡

  • このエラーに遭遇したことがあります.問題はclientがconfigを設定している間にkafkaのversionが正しく設定されていないからです.したがって、最初からバージョン設定を複数回変更しても効果はありません.
  • は、編成されたミラーがローカルミラーと異なる可能性があると判断したため、編成されたミラーをローカルに引っ張って起動し、要求し、書き込みに成功したことを発見した.さらに不思議なことに、同じミラーがローカルを起動しても正常に書き込むことができ、リモートではできません.
  • はネットワークの問題だと疑っています.しかし、pingの過遅延は数ミリ秒であり、すべての書き込み要求がすべてエラーであり、ネットワーク遅延によるものではない.
  • 同僚に助けを求めた後、解決策を見つけた.

  • 問題の原因


    ローカル書き込みに成功し、リモートテスター書き込みに成功しなかったのは、ミラー起動時のパラメータが異なるためです.私が編成したkafkaミラー起動環境変数は以下のように設定されています.
    KAFKA_PORT='23310'
    KAFKA_ADVERTISED_HOST_NAME='KSSCS40000H31904120007'
    KAFKA_ZOOKEEPER_CONNECT='KSSCS40000H31904120007:23010'
    KAFKA_BROKER_ID='1'
    KAFKA_LOG_DIRS='/kafka'
    KAFKA_AUTO_CREATE_TOPICS_ENABLE='false'
    KAFKA_LOG_CLEANUP_POLICY='compact'
    KAFKA_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
    KAFKA_HEAP_OPTS='-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
    JMX_PORT='23311'
    KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
    

    その中で問題になったのがKAFKA_LOG_CLEANUP_POLICY=‘compact’という環境変数.kafkaのデフォルトの構成ではKAFKA_LOG_CLEANUP_POLICY=「delete」で、ローカルで起動するときにデフォルトの構成を使用します.送信コードフラグメントは次のように書かれています.
    kafkaTopic := getKafkaTopicName(sub.TopicName, sub.Name)
    for _, message := range messages {
       if !publishFilter(sub, message) {
          continue
       }
       value, err := proto.Marshal(message)
       if err != nil {
          continue
       }
       msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
       select {
       case producer.Input()  msg:
          counterPublishMsgOfSubcription.With(prometheus.Labels{topicLabelName: sub.TopicName, subscriptionLabelName: sub.Name}).Inc()
       case ctx.Done():
          break
       }
    }
    

    送信に失敗した理由は
      msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
    

    のProducerMessageにはKeyというフィールドが記入されていません.私たちがメッセージを送信するのはkeyのhash形式ではなくランダムな形式を採用しているので、このフィールドは記入しなくてもいいと思います.一つは何を記入するか分からないことです.もう一つは使っていないと思います.ちょうどこの役に立たないのは踏んだ穴だ.KAFKA_LOG_CLEANUP_POLICY=‘compact’の場合、kafkaの削除ポリシーはkey保存で圧縮されます.そのため、書き込まれたメッセージにKeyというフィールドがないとエラーが報告されます(このエラー情報は原因とあまり一致していません.ネット上ではCRCエラーが必ずしも説明と一致していないという議論もあります).

    解決策


    問題の所在が分かったら、送信コードはそれに応じて変更されます.
    	msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value)), Key: sarama.StringEncoder(message.Id)}
    

    その後、送信に成功し、問題が解決しました.