Kafka踏み込み記録

921 ワード

1.Java Client Consumer実装"--from-beginning":
KafkaConsumer JavaDoc の「Controlling The Consumer's Position」章では、「  seekToBeginning(Collection)  "「--from-beginning」という機能を実現できますが、この方法をどのように呼び出すかは教えてくれません.「subscribe」の後、すぐに「seekToBeginning」を呼び出すと「No current assignment for partition」というメッセージが表示されます.正しい呼び出し姿勢は「subscribe」を呼び出すときに、2番目の ConsumerRebalanceListener listener"このパラメータはsubscribeが成功した後にコールバックします.
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection partitions) {
                consumer.seekToBeginning(partitions);
            }
        });

上記のコードで「--from-beginning」機能を実現することができる.