KafkaProducer-kafkaプロバイダーJavaクライアント

12164 ワード

転載先:https://blog.csdn.net/cjf_Wei/article/details/77920435 KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)は、kafkaクラスタにデータを送信するJavaクライアントである.Javaクライアントはスレッドが安全で、複数のスレッドが同じproducerインスタンスを共有することができ、通常、複数のスレッドでスレッドごとにインスタンスを作成するよりも高速です.本明細書で説明する内容はkafkaの公式ドキュメントから来ており、詳細はKafkaProducerを参照して、以下に簡単な例を示し、その後、パラメータについて説明する.
package com.test.kafkaProducer;

import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;

public class TestProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.137.200:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //        
        String topic = "mytopic";
        Producer procuder = new KafkaProducer(props);
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            ProducerRecord msg = new ProducerRecord(topic, value);
            procuder.send(msg);
        }
        //  topic     
        List partitions = new ArrayList() ;
        partitions = procuder.partitionsFor(topic);
        for(PartitionInfo p:partitions)
        {
            System.out.println(p);
        }

        System.out.println("send message over.");
        procuder.close(100,TimeUnit.MILLISECONDS);
    }
}

producerは、送信されるメッセージを保存するためのバッファプールを含み、バッファ内のメッセージはkafkaクラスタに送信されていないメッセージである.最下位レベルのkafka I/Oスレッドは、バッファプール内のメッセージを要求に変換してクラスタに送信する役割を果たします.Produceが終了したときにclose()メソッドが呼び出されなかった場合、これらのリソースが漏洩します.消費者を確立するための関連パラメータの説明とそのデフォルト値はproducerconfigsを参照してください.ここでは、コードで使用されるいくつかのパラメータを説明します.bootstrap.servers:初期化時にkafkaクラスタにリンクを確立し、host:port形式で、host 1:port 1、host 2:port 2をカンマで区切る.acks:生産者はserver側がメッセージを受信した後、フィードバック確認の尺度を行い、主にメッセージの信頼性伝送に使用する必要がある.acks=0は、生産者がserverからの確認を必要としないことを示す.acks=1は、サーバ側がメッセージを保存するとackを送信することができ、他のfollowerロールがメッセージを受信するまで待つ必要はありません.acks=all(or acks=-1)は、server側がすべてのコピーが受信されるのを待ってから確認を送信することを意味する.retries:生産者の送信に失敗した後、再試行の回数batch.size:複数のメッセージが同じpartitionに送信されると、この値は生産者の一括送信メッセージのサイズを制御し、一括送信は生産者からサービス側への要求数を減らし、クライアントとサービス側の性能を向上させるのに役立つ.linger.ms:デフォルトでは、バッファのスペースが切れていない場合でも、バッファのメッセージはすぐにサービス側に送信されます.この値は0より大きい値に設定することができ、送信者はしばらく待ってからサービス側に要求を送信し、各要求ができるだけ多くのバッチメッセージを送信できるようにすることができる.batch.sizeとlinger.msは、クライアントができるだけ多くのメッセージを送信するように要求する2つのメカニズムであり、それらは共存して使用することができ、衝突しない.buffer.memory:生産者バッファのサイズ.まだサーバ側に送信されていないメッセージを保存します.生産者の送信速度がメッセージがサーバ側に送信される速度より大きい場合、バッファは消費されます.key.serializer,value.serializerは、ユーザーが提供するkey値とvaule値をバイトにシーケンス化する方法を説明します.
**
kafkaクライアントのAPI
**KafkaProducerオブジェクトのインスタンス化方法は、map形式のキー値ペアまたはPropertiesオブジェクトを使用してクライアントのプロパティを構成できます.
/*
 *keySerializer:    key       ,      Serializer  
 *valueSerializer:    value       ,      Serializer  
 */
public KafkaProducer(Map configs);
public KafkaProducer(Map configs, Serializer keySerializer,Serializer valueSerializer);
public KafkaProducer(Properties properties);
public KafkaProducer(Properties properties, Serializer keySerializer,Serializer valueSerializer);

**
メッセージ送信方法send()
**
/*
 *record:key-value        
 *callback:       borker         
 */
public Future send(ProducerRecord record); // Equivalent to send(record, null)
public Future send(ProducerRecord record,Callback callback);

sendメソッドは、バッファ内のメッセージをbrokerの指定topicに非同期で送信する責任を負います.非同期送信とは、送信するI/Oキャッシュにメッセージを格納した後、すぐに戻る方法であり、パラレルでブロックされていないより多くのメッセージの送信を実現することができる.sendメソッドの戻り値は、メッセージが送信されるpartition情報、メッセージのoffset、タイムスタンプを含むRecordMetadataタイプです.sendはFutureオブジェクトを返すため、get()メソッドを呼び出すと、関連する送信要求が完了し、メタデータ情報が返されるまでブロックされます.または送信時に異常を投げ出して終了します.送信をブロックする方法は次のとおりです.
String key = "Key";
String value = "Value";
ProducerRecord<String, String> record = new ProducerRecord<String, String>(key, value);
producer.send(record).get();

コールバック関数と非同期送信方式を活用して、メッセージ送信の進捗状況を確認できます.
ProducerRecord record = new ProducerRecord("the-topic", key, value);
producer.send(myRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null) {
                            e.printStackTrace();
                        } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                        }
                    }
                });

flushキャッシュデータを直ちに送信
public void flush();

このメソッドを呼び出すと、バッファのすべてのメッセージが直ちに送信され(linger.msパラメータが0より大きく設定されている場合でも)、これらの関連メッセージの送信要求が完了するまでブロックされます.flushメソッドの前置条件は、以前に送信されたすべてのメッセージリクエストが完了したことです.1つのリクエストが完了したとみなされるのは、acksパラメータ構成項目に基づいて対応する確認が受信されたか、送信中に例外が投げ出されて失敗したことを意味します.次の例は、1つのtopic消費から別のtopicに送信されることを示しています.flushメソッドは、以前に送信されたメッセージが確実に完了したことを確認するために便利な方法を提供します.
for(ConsumerRecord<String, String> record: consumer.poll(100))
    producer.send(new ProducerRecord("my-topic", record.key(), record.value());
producer.flush();  //           
consumer.commit(); //           

partitionsFor
//    topic partition     
public List partitionsFor(String topic);

close
//  producer,      ,             
public void close();//  equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
public void close(long timeout,TimeUnit timeUnit); //  ,     timeout  ,