ドメイン名解析異常によるKafkaメッセージの送信に失敗しましたが、異常はありません.

4672 ワード

背景
ビジネス上の必要性のため、最近、あるセグメントにまたがるサービスが導入され、あるセグメントのアプリケーションサーバから別のセグメントのKafkaクラスタにメッセージが書き込まれ、アプリケーションサーバとKafkaクラスタの間にネットワークが開通し、telnetの結果、対応するポート間の接続が正常であることが明らかになった.
初回起動後、次の方法が正常に実行されていることが判明し、タイムアウト閉塞動作は発生しなかったが、Kafka消費者は送信されたメッセージを受信できず、実際にKafkaクラスタに書き込まれていない疑いがある.
producer.send(record)

ソーストラッキング
実際、ソースコードを追跡すると、Kafkaクライアントが送信する際、スループットを向上させるためにbatch非同期送信メカニズムが採用されており、実際にメッセージを送信する際に以下の流れに従っていることがわかります.
1.WALのような保留中のメッセージを本プロセスメモリに書き込む
2.保留中のメッセージが一定数に達するかlingerを超える.ms以降、Sender呼び出しRecordBatchにより一括送信し、異常情報を記録する
送信プロセスは非同期であるため,送信時に異常は投げ出されず,メッセージが正常に送信されたと勘違いする.
実際、非同期送信方式のセットとして、Kafkaはコールバックインタフェースを提供し、クライアントがメッセージ送信状態を表示するために、関数の原型は以下の通りである.
    @Override
    public Future send(ProducerRecord record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

Callbackインタフェースを見てみましょう
public interface Callback {

    /**
     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
     * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
     * non-null.
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
     *        occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     *                  Possible thrown exceptions include:
     *
     *                  Non-Retriable exceptions (fatal, the message will never be sent):
     *
     *                  InvalidTopicException
     *                  OffsetMetadataTooLargeException
     *                  RecordBatchTooLargeException
     *                  RecordTooLargeException
     *                  UnknownServerException
     *
     *                  Retriable exceptions (transient, may be covered by increasing #.retries):
     *
     *                  CorruptRecordException
     *                  InvalidMetadataException
     *                  NotEnoughReplicasAfterAppendException
     *                  NotEnoughReplicasException
     *                  OffsetOutOfRangeException
     *                  TimeoutException
     *                  UnknownTopicOrPartitionException
     */
    public void onCompletion(RecordMetadata metadata, Exception exception);
}

デバッグ例外
冗長なコメントを除いて、送信中に異常が発生した場合、コールバックインタフェースで異常を取得できることがわかります.
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            logger.error("Caught exception ", exception);
                        }
                    }
                });

アプリケーションを再度オンラインにすると,アプリケーションが異常を大量に放出していることが分かった.異常内容は「Expiring xxx records for xxx due to x ms has passed since batch creation plus linger time」です.
このように、メッセージはKafka brokerに実際に送信されず、バックグラウンドで静かに捨てられた.ネットワークが開通しているのにメッセージを送信できないのは、原因をさらに掘り起こす必要がある.
深く掘り下げる
アプリケーションのログをDEBUGに呼び出し、アプリケーションの起動からのログを観察すると、KafkaProducerは初期化時に、topic情報、partition情報、brokerノード情報、topic/partitionとノードの対応関係など、指定したbrokersからmeta情報を取得しようとしていることがわかります.
返されるmeta情報を見ると、brokerノード情報の返される値がホストドメイン名であることが問題です.
その理由は次のとおりです.
1.KafkaProducer初期化時に指定されたipからmeta informationを取得しようとする
2.取得したmeta infoに基づいて指定したbrokerにメッセージを送信
2つのセグメント間のDNS情報が同期していないため、アプリケーションが存在するセグメントがbrokerのドメイン名を解析できないため、メッセージの送信に失敗し、DNSが変更できない場合、アプリケーションノードの/etc/hostsファイルにbrokerノードのドメイン名とipマッピングを追加し、問題を解決する.
に続く
Kafkaが対外的に提供するmeta infoにおけるノードアドレス情報は、クラスタ起動時に書き込まれたzookeeperの登録情報を直接読み出すものであり、情報の最終ソースはserverである.properties.Kafka公式文書によるとhttp://kafka.apache.org/0101/documentation.html#brokerconfigsをクリックして、特定のグレードを検索します.
1.advertisedを最初に読み込む.Listenersの値
2.1の値が空の場合はadvertisedを使用します.host.name & advertised.port zookeeperに登録
KafkaProducerの送信モデルは、バージョン0.9以降は非同期送信モード(スループットアップ)がデフォルトである、同期モードを使用する場合はsend(ProducerRecord producerRecord)メソッドを取得したfutureが戻る後にfutureを使用することができる.get()メソッドは、次のようにブロックされます.
                Future future = producer.send(record);
                future.get();

ミドルウェアの動作メカニズムと詳細については、問題を迅速に特定するには、より深く理解する必要があります.