KAFKA 0.10バージョン消費者の例
9870 ワード
以前は0.8バージョンのkafkaを使用していましたが、この間はお客様の現場で0.10バージョンを使用していました.apiには多くの修正点があるので、0.10バージョンの消費者コードを記録します.コードの詳細:
以上の内容は参考までに!
作者は竹君、転送は出典を明記してください!~
public class KafkaSource {
public void start() {
// topic
String testTopic = "topic_name";
// ,zookeeper,kafka
String zookeeper = "192.168.168.168:2181";
String consumerGroup = "consumer_group_name";
String bootstrapServers = "192.168.168.168:9092";
// kafka
Properties properties = getKafkaProperties(zookeeper, consumerGroup, bootstrapServers);
//0.10
TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener();
// consumer
KafkaConsumer<Object, Map<String, Object>> testConsumer = new KafkaConsumer<>(properties);
testConsumer.subscribe(Collections.singletonList(testTopic), rebalanceListener);
// kafka , queue
while (true) {
ConsumerRecords<Object, Map<String, Object>> records = testConsumer.poll(1000);
for (ConsumerRecord<Object, Map<String, Object>> record : records) {
Map<String, Object> value = record.value();
//value ,
System.out.println(" ==========="+ value );
}
kafkaConsumer.commitSync();
}
}
public void stop() {
if (consumer != null) {
consumer.stop();
}
}
private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection partitions) {
System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
}
@Override
public void onPartitionsAssigned(Collection partitions) {
System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
}
}
//--- kafka , , , kafka0.10 api---
private Properties getKafkaProperties(String zookeeper, String consumerGroup, String bootstrapServers) {
if (StringUtils.isAnyBlank(zookeeper, consumerGroup)) {
throw new RuntimeException("zookeeper and consumer group configuration can not be null");
}
Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// , key
props.put("value.deserializer", MapDecoder.class.getName());
props.put("group.id", consumerGroup);
//offset ,earliest lastest
props.put("auto.offset.reset", "earliest");
props.put("security.protocol", "PLAINTEXTSASL");
props.put("bootstrap.servers", bootstrapServers);
return props;
}
}
以上の内容は参考までに!
作者は竹君、転送は出典を明記してください!~