kafka 0.10 client使用例
5000 ワード
シーケンス
本文は主にkafka 0.10 clientを簡単に使ってメッセージを送受信する方法を説明します
maven
log 4 jを使うならexcludeを使わなくてもいいです
producer
consume
0.8バージョンとは異なり、topicCountMapは不要ですThis client transparently handles the failure of Kafka brokers,and transparently adapts as topic partitions it fetches migrate within the cluster.The consumer is not thread-safe.
consumerマルチスレッドスキーム複数のconsumerのアプリケーションインスタンスを起動し、dockerおよびkubernetesを使用するシーンでは が便利です.単一のアプリケーションインスタンスで、内部には複数のKafkaConsumerインスタンス がある.単一アプリケーションインスタンス、単一KafkaConsumerインスタンス、マルチスレッド/非同期消費メッセージ 個人の比較傾向の第1の方案、topicのpartitionは何個あって、consumer応用は何個の実例を起こしてスループットに対して大きくて、また処理の消費速度を加速して、それでは第3の方案を加えます
doc kafka-01020-document 【オリジナル】Kafka Consumerマルチスレッドインスタンス kafkaのconsumer消費能力が低い場合の処理案 をまとめる【オリジナル】kafkaのパーティション数とマルチスレッド消費 を検討する Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client
本文は主にkafka 0.10 clientを簡単に使ってメッセージを送受信する方法を説明します
maven
org.apache.kafka
kafka-clients
0.10.2.1
org.slf4j
slf4j-log4j12
log4j
log4j
slf4j-api
org.slf4j
log 4 jを使うならexcludeを使わなくてもいいです
producer
@Test
public void send(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer producer = new KafkaProducer(props);
for (int i = 0; i < 1000000; i++) {
ProducerRecord record = new ProducerRecord(topic, Integer.toString(i),
Integer.toString(i));
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata != null) {
System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n",
metadata.partition(), metadata.offset(), metadata.serializedKeySize(),
metadata.serializedValueSize());
}
if(exception != null) {
exception.printStackTrace();
}
}
});
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
consume
@Test
public void receive(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
try{
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords records = consumer.poll(10000);
records.forEach(record -> {
System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
record.partition(), record.offset(), record.key(), record.value());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}
0.8バージョンとは異なり、topicCountMapは不要ですThis client transparently handles the failure of Kafka brokers,and transparently adapts as topic partitions it fetches migrate within the cluster.The consumer is not thread-safe.
consumerマルチスレッドスキーム
doc