kafka異常をコミットorg.apache.kafka.clients.consumer.CommitFailedException

5324 ワード

一、背景


kafkaの使用バージョンは0.10.1.0で、今日kafkaConsumerをテストするとき、コードは以下の通りです.
public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.132:9092");
        props.put("group.id", "testId");
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.interval.ms", "3000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("kafka-topic-02"));
        try {
            while(true) {
                ConsumerRecords records = consumer.poll(100);
                System.out.println("********* 6s, **********");
                Thread.sleep(6000);
                System.out.println("*************** *****************");
                consumer.commitSync();
            }
        }catch(Exception e) {
            e.printStackTrace();
        }finally {
            consumer.close();
        }

実行すると、エラーが発生しました.エラーは、消費者グループがrebalanced操作を開始し、対応するパーティションを他の消費者に割り当てたため、手動でコミットしたときに失敗したことを意味します.
********* 6s, **********
*************** *****************
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1072)
    at com.tanjie.kafka.ConsumerDemo.main(ConsumerDemo.java:29)


公式サイトを見ると、パラメータmax.pollが見つかりました.interval.ms(公式サイトでデフォルト値3000)は、kafkaServer側pollメッセージからpoll()の呼び出し間の最大遅延を意味する.これにより、消費者がより多くのレコードを取得する前に空き時間を得ることができる時間量の上限が提供される.このタイムアウトが満了する前にpoll()が呼び出されなかった場合、ユーザは失敗し、消費者グループはパーティションを他の消費者に再割り当てするために再バランスすると考えられ、ここではThreadを設定する.sleep(6000) > max.poll.interval.ms値、すなわち私たちが手動で提出したとき、実際にパーティション情報は消費者グループ全体の他の消費者に割り当てられています.
上記の理由を理解して、修正も簡単で、第一にmax.pollを大きくすることです.interval.msの値ですが、実際の生産では、poll 100メッセージごとにN/sを処理する場合はmax.pollをテストすることができます.interval.ms値を>100*Nに設定すると、上記の異常は発生しません.
修正後のコードは以下の通りです.
public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.80.132:9092");
		props.put("group.id", "testId");
		props.put("enable.auto.commit", "false");
		props.put("session.timeout.ms", "10000");
		props.put("max.poll.interval.ms", "3000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("kafka-topic-02"));
		try {
			while(true) {
				ConsumerRecords records = consumer.poll(100);
				System.out.println("********* 2s, **********");
				Thread.sleep(2000);
				System.out.println("*************** *****************");
				for(ConsumerRecord record : records) {
					System.out.println("offest:" + record.offset() + ";value: " + record.value());
				}
				consumer.commitSync();
			}
		}catch(Exception e) {
			e.printStackTrace();
		}finally {
			consumer.close();
		}
	}

実行結果は次のとおりです.
********* 2s, **********
*************** *****************
********* 2s, **********
*************** *****************
offest:18;value: 3f85df8b-4ec6-4b9d-be22-0c269ead55a9
offest:19;value: 34d8b20d-eb24-4cd5-a249-75b41a6fda5b
offest:20;value: dcc4bd2f-912d-4fda-b141-7c759ea3dc24
offest:21;value: d9a7db66-d9b5-4c0f-9287-f0197a7dcdf0
offest:14;value: 2dfab504-da05-4f38-a7be-cb1e0c7a8ea9
offest:15;value: c3170f1b-bbe6-4c58-8fda-c7a548ca4e3c
offest:18;value: 966faa1c-4cf1-44cd-a63e-818940fcb26a
offest:19;value: 7bb5b5aa-bd74-4c2c-9008-58714028a882
offest:20;value: 0f79cacf-c558-4696-985c-0ea0949acd4e
offest:21;value: 471f54a0-529f-4f54-b9fb-706c042839e2
********* 2s, **********
*************** *****************
********* 2s, **********
*************** *****************
********* 2s, **********