kafka 0.10 client使用例

5000 ワード

シーケンス
本文は主にkafka 0.10 clientを簡単に使ってメッセージを送受信する方法を説明します
maven
    
            org.apache.kafka
            kafka-clients
            0.10.2.1
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    log4j
                    log4j
                
                
                    slf4j-api
                    org.slf4j
                
            
        

log 4 jを使うならexcludeを使わなくてもいいです
producer
    @Test
    public void send(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer producer = new KafkaProducer(props);
        for (int i = 0; i < 1000000; i++) {
            ProducerRecord record = new ProducerRecord(topic, Integer.toString(i),
                    Integer.toString(i));
            producer.send(record, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(metadata != null) {
                        System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n",
                                metadata.partition(), metadata.offset(), metadata.serializedKeySize(),
                                metadata.serializedValueSize());
                    }
                    if(exception != null) {
                        exception.printStackTrace();
                    }
                }

            });
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }

consume
    @Test
    public void receive(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        try{
            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords records = consumer.poll(10000);
                records.forEach(record -> {
                    System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(),
                            record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

0.8バージョンとは異なり、topicCountMapは不要ですThis client transparently handles the failure of Kafka brokers,and transparently adapts as topic partitions it fetches migrate within the cluster.The consumer is not thread-safe.
consumerマルチスレッドスキーム
  • 複数のconsumerのアプリケーションインスタンスを起動し、dockerおよびkubernetesを使用するシーンでは
  • が便利です.
  • 単一のアプリケーションインスタンスで、内部には複数のKafkaConsumerインスタンス
  • がある.
  • 単一アプリケーションインスタンス、単一KafkaConsumerインスタンス、マルチスレッド/非同期消費メッセージ
  • 個人の比較傾向の第1の方案、topicのpartitionは何個あって、consumer応用は何個の実例を起こしてスループットに対して大きくて、また処理の消費速度を加速して、それでは第3の方案を加えます
    doc
  • kafka-01020-document
  • 【オリジナル】Kafka Consumerマルチスレッドインスタンス
  • kafkaのconsumer消費能力が低い場合の処理案
  • をまとめる
  • 【オリジナル】kafkaのパーティション数とマルチスレッド消費
  • を検討する
  • Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client