【kafka】消費者
5198 ワード
consumerは、フェイルオーバ後に消費を継続するために、自分が消費したoffsetをリアルタイムで記録する必要があります.
だから消費者を設計する時、offsetは考慮しなければならない問題です
カスタムストレージoffsetの消費者
だから消費者を設計する時、offsetは考慮しなければならない問題です
public static void consumerMsg() {
Properties props = new Properties();
// Kafka
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// , group.id ,
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// offset。 : ,
//earliest: ( )
//latest:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// offset=false。 offset :consumer.commitSync();
// offset=true. consumer.commitSync();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//key、value fan
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
KafkaConsumer consumer = new
KafkaConsumer(props);
//
consumer.subscribe(Arrays.asList("firstTopic"));
while (true) {
// ( )
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.print("offset:" + record.offset() + " key:" + record.key() + " value:" + record.value());
}
/** :
* offset, 。
* offset , ;
* offset, 。
*/
// , offset
consumer.commitSync();
//
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (exception != null) {
System.err.println(" offset failed:" + offsets);
}
}
});
}
}
カスタムストレージoffsetの消費者
private static Map currentOffset = new HashMap();
// offset
public static void consumerMsgSaveOffset() {
Properties props = new Properties();
// Kafka
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// , group.id ,
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// offset。 : ,
//earliest: ( )
//latest:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// offset=false。 offset :consumer.commitSync();
// offset=true. consumer.commitSync();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//key、value fan
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
final KafkaConsumer consumer = new
KafkaConsumer(props);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
// Rebalance
@Override
public void onPartitionsRevoked(Collection partitions) {
commitOffset(currentOffset);
}
// // Rebalance
@Override
public void onPartitionsAssigned(Collection partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));
// offset
}
}
};
//
consumer.subscribe(Arrays.asList("firstTopic"), listener);
while (true) {
// ( )
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.print("offset:" + record.offset() + " key:" + record.key() + " value:" + record.value());
}
commitOffset(currentOffset);
}
}
// offset
private static long getOffset(TopicPartition partition) {
return 0;
}
// offset
private static void commitOffset(Map currentOffset) {
}