【kafka】消費者

5198 ワード

consumerは、フェイルオーバ後に消費を継続するために、自分が消費したoffsetをリアルタイムで記録する必要があります.
だから消費者を設計する時、offsetは考慮しなければならない問題です
public static void consumerMsg() {
        Properties props = new Properties();
        //  Kafka  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //    ,   group.id   ,          
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        //      offset。        :     ,           
        //earliest:               (  )
        //latest:              
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //       offset=false。       offset  :consumer.commitSync();
        //      offset=true.      consumer.commitSync();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        //key、value fan   
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        KafkaConsumer consumer = new
                KafkaConsumer(props);
        //          
        consumer.subscribe(Arrays.asList("firstTopic"));
        while (true) {
            //           (         )
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.print("offset:" + record.offset() + " key:" + record.key() + " value:" + record.value());
            }

            /**  :
             *               offset,                   。
             *     offset    ,           ;
             *         offset,             。
             */
            //    ,          offset     
            consumer.commitSync();

            //    
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("      offset failed:" + offsets);
                    }
                }
            });
        }
    }

カスタムストレージoffsetの消費者
 private static Map currentOffset = new HashMap();

    //      offset
    public static void consumerMsgSaveOffset() {
        Properties props = new Properties();
        //  Kafka  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //    ,   group.id   ,          
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        //      offset。        :     ,           
        //earliest:               (  )
        //latest:              
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //       offset=false。       offset  :consumer.commitSync();
        //      offset=true.      consumer.commitSync();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        //key、value fan   
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        final KafkaConsumer consumer = new
                KafkaConsumer(props);

        ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

            //      Rebalance     
            @Override
            public void onPartitionsRevoked(Collection partitions) {
                commitOffset(currentOffset);
            }

            // //      Rebalance     
            @Override
            public void onPartitionsAssigned(Collection partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition));
                    //         offset       
                }
            }
        };

        //          
        consumer.subscribe(Arrays.asList("firstTopic"), listener);
        while (true) {
            //           (         )
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.print("offset:" + record.offset() + " key:" + record.key() + " value:" + record.value());
            }
            commitOffset(currentOffset);
        }
    }

    //         offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }

    //            offset
    private static void commitOffset(Map currentOffset) {
    }