kafka0.8消費者事例

2492 ワード

シーケンス
ここではkafka 0の使い方を簡単に示す.8のclientはtopicを消費します.
maven

            org.apache.kafka
            kafka_2.10
            0.8.2.2
        

クライアントの初期化
Properties props = new Properties();
        props.put("zookeeper.connect", zk);
//        props.put("auto.offset.reset","smallest");
        props.put("group.id",group);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "10000");
        props.put("consumer.timeout.ms","10000"); //  ConsumerIterator hasNext     ,               
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map topicCountMap = new HashMap();
        topicCountMap.put(topic, consumerCount);
        Map>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

同時消費
consumerMap.get(topic).stream().forEach(stream -> {

            pool.submit(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator it = stream.iterator();

                    //it.hasNext()   consumer.timeout.ms  ,   -1
                    try{
                        while (it.hasNext()) {
                            System.out.println(Thread.currentThread().getName()+" hello");
                            // hasNext    ,   next  
                            System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
                        }
                    }catch (ConsumerTimeoutException e){
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName()+" end");
                }
            });

        });

注意事項
消費者インスタンス数*各インスタンスの消費スレッド数<=topicのpartition数、そうでないと余計な無駄になります.