KAFKA 0.10バージョン消費者の例

9870 ワード

以前は0.8バージョンのkafkaを使用していましたが、この間はお客様の現場で0.10バージョンを使用していました.apiには多くの修正点があるので、0.10バージョンの消費者コードを記録します.コードの詳細:
public class KafkaSource {
  
    public void start() {
        //  topic  
        String testTopic = "topic_name";
        //        ,zookeeper,kafka     
        String zookeeper = "192.168.168.168:2181";
        String consumerGroup = "consumer_group_name";
        String bootstrapServers = "192.168.168.168:9092";
        //  kafka  
        Properties properties = getKafkaProperties(zookeeper, consumerGroup, bootstrapServers);
        //0.10   
        TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener();
        //   consumer
        KafkaConsumer<Object, Map<String, Object>> testConsumer = new KafkaConsumer<>(properties);
        testConsumer.subscribe(Collections.singletonList(testTopic), rebalanceListener);
      
        // kafka    ,    queue 
        while (true) {
        ConsumerRecords<Object, Map<String, Object>> records = testConsumer.poll(1000);
        for (ConsumerRecord<Object, Map<String, Object>> record : records) {
            Map<String, Object> value = record.value();
            //value       ,            
              System.out.println("      ==========="+ value );
        }
        kafkaConsumer.commitSync();
        }
    }

    public void stop() {
        if (consumer != null) {
            consumer.stop();
        }
    }

    private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection partitions) {
            System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection partitions) {
            System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
        }
    }

    //---  kafka  ,       ,       ,    kafka0.10 api---
    private Properties getKafkaProperties(String zookeeper, String consumerGroup, String bootstrapServers) {
        if (StringUtils.isAnyBlank(zookeeper, consumerGroup)) {
            throw new RuntimeException("zookeeper and consumer group configuration can not be null");
        }
        Properties props = new Properties();
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        //          ,       key     
        props.put("value.deserializer", MapDecoder.class.getName());
        props.put("group.id", consumerGroup);
        //offset    ,earliest lastest  
        props.put("auto.offset.reset", "earliest");
        props.put("security.protocol", "PLAINTEXTSASL");
        props.put("bootstrap.servers", bootstrapServers);
        return props;
    }
}

以上の内容は参考までに!
作者は竹君、転送は出典を明記してください!~