kafka0.8消費者事例
2492 ワード
シーケンス
ここではkafka 0の使い方を簡単に示す.8のclientはtopicを消費します.
maven
クライアントの初期化
同時消費
注意事項
消費者インスタンス数*各インスタンスの消費スレッド数<=topicのpartition数、そうでないと余計な無駄になります.
ここではkafka 0の使い方を簡単に示す.8のclientはtopicを消費します.
maven
org.apache.kafka
kafka_2.10
0.8.2.2
クライアントの初期化
Properties props = new Properties();
props.put("zookeeper.connect", zk);
// props.put("auto.offset.reset","smallest");
props.put("group.id",group);
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "10000");
props.put("consumer.timeout.ms","10000"); // ConsumerIterator hasNext ,
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map topicCountMap = new HashMap();
topicCountMap.put(topic, consumerCount);
Map>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
同時消費
consumerMap.get(topic).stream().forEach(stream -> {
pool.submit(new Runnable() {
@Override
public void run() {
ConsumerIterator it = stream.iterator();
//it.hasNext() consumer.timeout.ms , -1
try{
while (it.hasNext()) {
System.out.println(Thread.currentThread().getName()+" hello");
// hasNext , next
System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
}
}catch (ConsumerTimeoutException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" end");
}
});
});
注意事項
消費者インスタンス数*各インスタンスの消費スレッド数<=topicのpartition数、そうでないと余計な無駄になります.