ドメイン名解析異常によるKafkaメッセージの送信に失敗しましたが、異常はありません.
4672 ワード
背景
ビジネス上の必要性のため、最近、あるセグメントにまたがるサービスが導入され、あるセグメントのアプリケーションサーバから別のセグメントのKafkaクラスタにメッセージが書き込まれ、アプリケーションサーバとKafkaクラスタの間にネットワークが開通し、telnetの結果、対応するポート間の接続が正常であることが明らかになった.
初回起動後、次の方法が正常に実行されていることが判明し、タイムアウト閉塞動作は発生しなかったが、Kafka消費者は送信されたメッセージを受信できず、実際にKafkaクラスタに書き込まれていない疑いがある.
ソーストラッキング
実際、ソースコードを追跡すると、Kafkaクライアントが送信する際、スループットを向上させるためにbatch非同期送信メカニズムが採用されており、実際にメッセージを送信する際に以下の流れに従っていることがわかります.
1.WALのような保留中のメッセージを本プロセスメモリに書き込む
2.保留中のメッセージが一定数に達するかlingerを超える.ms以降、Sender呼び出しRecordBatchにより一括送信し、異常情報を記録する
送信プロセスは非同期であるため,送信時に異常は投げ出されず,メッセージが正常に送信されたと勘違いする.
実際、非同期送信方式のセットとして、Kafkaはコールバックインタフェースを提供し、クライアントがメッセージ送信状態を表示するために、関数の原型は以下の通りである.
Callbackインタフェースを見てみましょう
デバッグ例外
冗長なコメントを除いて、送信中に異常が発生した場合、コールバックインタフェースで異常を取得できることがわかります.
アプリケーションを再度オンラインにすると,アプリケーションが異常を大量に放出していることが分かった.異常内容は「Expiring xxx records for xxx due to x ms has passed since batch creation plus linger time」です.
このように、メッセージはKafka brokerに実際に送信されず、バックグラウンドで静かに捨てられた.ネットワークが開通しているのにメッセージを送信できないのは、原因をさらに掘り起こす必要がある.
深く掘り下げる
アプリケーションのログをDEBUGに呼び出し、アプリケーションの起動からのログを観察すると、KafkaProducerは初期化時に、topic情報、partition情報、brokerノード情報、topic/partitionとノードの対応関係など、指定したbrokersからmeta情報を取得しようとしていることがわかります.
返されるmeta情報を見ると、brokerノード情報の返される値がホストドメイン名であることが問題です.
その理由は次のとおりです.
1.KafkaProducer初期化時に指定されたipからmeta informationを取得しようとする
2.取得したmeta infoに基づいて指定したbrokerにメッセージを送信
2つのセグメント間のDNS情報が同期していないため、アプリケーションが存在するセグメントがbrokerのドメイン名を解析できないため、メッセージの送信に失敗し、DNSが変更できない場合、アプリケーションノードの/etc/hostsファイルにbrokerノードのドメイン名とipマッピングを追加し、問題を解決する.
に続く
Kafkaが対外的に提供するmeta infoにおけるノードアドレス情報は、クラスタ起動時に書き込まれたzookeeperの登録情報を直接読み出すものであり、情報の最終ソースはserverである.properties.Kafka公式文書によるとhttp://kafka.apache.org/0101/documentation.html#brokerconfigsをクリックして、特定のグレードを検索します.
1.advertisedを最初に読み込む.Listenersの値
2.1の値が空の場合はadvertisedを使用します.host.name & advertised.port zookeeperに登録
KafkaProducerの送信モデルは、バージョン0.9以降は非同期送信モード(スループットアップ)がデフォルトである、同期モードを使用する場合はsend(ProducerRecord producerRecord)メソッドを取得したfutureが戻る後にfutureを使用することができる.get()メソッドは、次のようにブロックされます.
ミドルウェアの動作メカニズムと詳細については、問題を迅速に特定するには、より深く理解する必要があります.
ビジネス上の必要性のため、最近、あるセグメントにまたがるサービスが導入され、あるセグメントのアプリケーションサーバから別のセグメントのKafkaクラスタにメッセージが書き込まれ、アプリケーションサーバとKafkaクラスタの間にネットワークが開通し、telnetの結果、対応するポート間の接続が正常であることが明らかになった.
初回起動後、次の方法が正常に実行されていることが判明し、タイムアウト閉塞動作は発生しなかったが、Kafka消費者は送信されたメッセージを受信できず、実際にKafkaクラスタに書き込まれていない疑いがある.
producer.send(record)
ソーストラッキング
実際、ソースコードを追跡すると、Kafkaクライアントが送信する際、スループットを向上させるためにbatch非同期送信メカニズムが採用されており、実際にメッセージを送信する際に以下の流れに従っていることがわかります.
1.WALのような保留中のメッセージを本プロセスメモリに書き込む
2.保留中のメッセージが一定数に達するかlingerを超える.ms以降、Sender呼び出しRecordBatchにより一括送信し、異常情報を記録する
送信プロセスは非同期であるため,送信時に異常は投げ出されず,メッセージが正常に送信されたと勘違いする.
実際、非同期送信方式のセットとして、Kafkaはコールバックインタフェースを提供し、クライアントがメッセージ送信状態を表示するために、関数の原型は以下の通りである.
@Override
public Future send(ProducerRecord record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
Callbackインタフェースを見てみましょう
public interface Callback {
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
*
* Non-Retriable exceptions (fatal, the message will never be sent):
*
* InvalidTopicException
* OffsetMetadataTooLargeException
* RecordBatchTooLargeException
* RecordTooLargeException
* UnknownServerException
*
* Retriable exceptions (transient, may be covered by increasing #.retries):
*
* CorruptRecordException
* InvalidMetadataException
* NotEnoughReplicasAfterAppendException
* NotEnoughReplicasException
* OffsetOutOfRangeException
* TimeoutException
* UnknownTopicOrPartitionException
*/
public void onCompletion(RecordMetadata metadata, Exception exception);
}
デバッグ例外
冗長なコメントを除いて、送信中に異常が発生した場合、コールバックインタフェースで異常を取得できることがわかります.
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("Caught exception ", exception);
}
}
});
アプリケーションを再度オンラインにすると,アプリケーションが異常を大量に放出していることが分かった.異常内容は「Expiring xxx records for xxx due to x ms has passed since batch creation plus linger time」です.
このように、メッセージはKafka brokerに実際に送信されず、バックグラウンドで静かに捨てられた.ネットワークが開通しているのにメッセージを送信できないのは、原因をさらに掘り起こす必要がある.
深く掘り下げる
アプリケーションのログをDEBUGに呼び出し、アプリケーションの起動からのログを観察すると、KafkaProducerは初期化時に、topic情報、partition情報、brokerノード情報、topic/partitionとノードの対応関係など、指定したbrokersからmeta情報を取得しようとしていることがわかります.
返されるmeta情報を見ると、brokerノード情報の返される値がホストドメイン名であることが問題です.
その理由は次のとおりです.
1.KafkaProducer初期化時に指定されたipからmeta informationを取得しようとする
2.取得したmeta infoに基づいて指定したbrokerにメッセージを送信
2つのセグメント間のDNS情報が同期していないため、アプリケーションが存在するセグメントがbrokerのドメイン名を解析できないため、メッセージの送信に失敗し、DNSが変更できない場合、アプリケーションノードの/etc/hostsファイルにbrokerノードのドメイン名とipマッピングを追加し、問題を解決する.
に続く
Kafkaが対外的に提供するmeta infoにおけるノードアドレス情報は、クラスタ起動時に書き込まれたzookeeperの登録情報を直接読み出すものであり、情報の最終ソースはserverである.properties.Kafka公式文書によるとhttp://kafka.apache.org/0101/documentation.html#brokerconfigsをクリックして、特定のグレードを検索します.
1.advertisedを最初に読み込む.Listenersの値
2.1の値が空の場合はadvertisedを使用します.host.name & advertised.port zookeeperに登録
KafkaProducerの送信モデルは、バージョン0.9以降は非同期送信モード(スループットアップ)がデフォルトである、同期モードを使用する場合はsend(ProducerRecord producerRecord)メソッドを取得したfutureが戻る後にfutureを使用することができる.get()メソッドは、次のようにブロックされます.
Future future = producer.send(record);
future.get();
ミドルウェアの動作メカニズムと詳細については、問題を迅速に特定するには、より深く理解する必要があります.