kafka消費問題処理記録

11947 ワード

社内データ交換にkafkaを使用していますが、最近、以下の異常が報告されることがあります.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:726)

まず、この問題がどのような状況で発生するかを分析します.まずkafkaデータ消費の正常な過程を見てみましょう.①消費者購読テーマ→②生産者生成データ→③消費者受信データ→④消費者処理データ→⑤消費者提出オフセット量;
正常な消費はもちろん問題ありませんが、想像すると、④このステップの処理が遅すぎて、長い間⑤このステップを実行していなかったり、消費者が問題を起こして直接クラッシュしたりしたら、この時サーバーはどのように処理すればいいのでしょうか.
消費者に割り当てられたデータにかかわらず、このデータは永久に失われます.このような状況は明らかに容認できない.kafkaはタイムアウト時間を導入することによってこの状況を処理し、このタイムアウト時間パラメータはmax.pollである.interval.msは、③→⑤からの期間を表し、タイムアウト時間を超えた場合、サーバはこの消費者に問題があると判断し、その消費者に元々割り当てられていたデータを再割り当てする.その後、消費者がオフセット量をコミットすると、サービス側は消費者にエラーを返します.これが上記のエラーの原因です.
サーバが消費者にレコードを割り当てるたびに、消費者とサーバの間のインタラクションが非常に頻繁になり、ネットワーク上のオーバーヘッドも非常に大きくなります.一般的に消費者は大量消費で、ネット上のオーバーヘッドを減らす.パラメータmax.pollを通過するレコードが毎回最大何件消費されるか.recordsで制御します.
以前の消費プロセスを見てみると、消費者がタイムアウトしてデータの重複配分を招くのはどのような状況ですか?以上の分析によれば、消費者が処理を完了しないまでタイムアウト時間が経過すると再配分につながる.タイムアウト時間が短く設定されているか、1回の最大処理のレコード数が非常に大きい場合は、この問題を引き起こす可能性があります.
以上、このような状況が発生した場合、以下の処理を行うことができる:1つは消費者の処理が遅いのはいったいどの過程なのかを明らかにし、この過程を集中的に最適化することであり、例えばマルチスレッド処理、MySqlの代わりにRedisを使用するなどの伝統的なデータベースなどである.二つ目は、最適化が完了した後に問題がある場合、タイムアウト時間と最大処理記録数の2つのパラメータを調整し、タイムアウト時間を大きくするか、最大処理記録数を小さくすることで、タイムアウトの確率を低減することができる.
会社の質問に戻り、2つのパラメータをプロファイルに書き込み(以前は直接書かれていなかった)、コンフィギュレーション・アイテムをロードするためにコンフィギュレーション・プロパティを使用するオブジェクトを定義します.初期化された消費者コードは次のとおりです.
	property = new Properties();
	property.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,serverNodes);
	property.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	//property.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
	property.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
	property.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
	property.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
	property.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

	property.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 20000);
	property.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
	property.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512 * 1024);

	property.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaProperties.getMaxPollIntervalMs());
	property.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getMaxPollRecords());
	property.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 40000);

	consumer = new KafkaConsumer<>(property);

改造が終わった後、消費者が情報を受け取れないことに気づいた!デバッグの結果、new KafkaConsumer<>(property)では戻ってこないことがわかりました.一歩ずつデバッグしていくと、コンフィグ・デフクラスのparseTypeメソッドで、プロパティ・パラメータが予想と一致しない場合、次のように例外が放出されることがわかりました.
 case INT:
	if (value instanceof Integer) {
		return value;
	} else if (value instanceof String) {
		return Integer.parseInt(trimmed);
	} else {
		throw new ConfigException(name, value, "Expected value to be a 32-bit integer, but it was a " + value.getClass().getName());
	}

問題は、コンフィギュレーションクラスではプロパティタイプがlongであり、kafkaは両方のパラメータを要求するタイプがIntegerであるため、例外を放出することが明らかになった.コンフィギュレーションプロパティを使用してkafkaのコンフィギュレーションアイテムをロードする場合は、パラメータタイプが予想されるタイプと一致することに注意してください.各パラメータタイプはConsumerConfigで確認できます.