Kafka Producer API


この文章は崔元英の「Java付きapachecapkapplicationプログラミング」に基づいて書かれた.
作者の紹介
https://github.com/bjpublic/apache-kafka-with-java

プロデューサーAPI


前述の論文では,カフカをインストールし,ローカル端末上でエージェントでトポロジーマップを作成し,記録の伝送を試みた.
今回は、Javaを使用して、カフカクライアントライブラリのproductor APIを使用してproductorアプリケーションを作成し、実行してみます.
IntelliJを使用して実行します.
まず、New Projectボタンでsimple-Kafka-Productorの名前でプロジェクトを作成します.「自動化ツールの構築」(Java、Maven、Gradleなど)からGradleを選択します.

build.勾配の修正


他のセクションはすべて可能ですが、依存項目のセクションを変更する必要があります.本の上.
compile 'org.apache.kafka:kafka-clients:2.5.0'
compile 'org.slf4j:slf4j-simple:1.7.30'
そうですが、現在の時点(2022年01月03日)を基準にJDK 16を使用しており、最近のIntelliJバージョンとGradle 7.1バージョンではcompileメソッドがないため、エラーが出力されています.
compileコマンドをimplementationに変換し、良好に動作します.
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
上のコードbuil.gradeファイルのdependenciesセクションに追加してリフレッシュし、外部ライブラリにKafkaとslf 4 jライブラリがインストールされていることを確認します.

Producerクラスの作成


著者githubに必要なソースコードはすでにアップロードされています.直接確認してみるのもいいですね.
orgはsimple-kafka-プロバイダ/src/main/javaフォルダにあります.exampleパッケージを作成し、SimpleProducer classファイルを作成しました.
自分のパッケージに従ってパッケージコード部分を修正した後、それをTOPIC NAMEを送信するテーマ名に変更し、BOOSTRAP SERVERS定数の「my-Kafka」部分を自分のサーバIPに合わせて修正すれば、正常に動作します.
// package name can be modified
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        
        producer.send(record);
        logger.info("{}", record);
        producer.flush();
        producer.close();
    }
}
「test」という名前のトピックには、キー値null、値「testMessage」のレコードが送信されます.
鍵の値を指定してプロキシに送信する場合は、
ProducerRecord<String, String> record = new ProducerRecord<> ("topic", "key", "value");
これによりキー値をパラメータとして入れることができます.
トピックのどのパーティションに送信するかを決定するには、
ProducerRecord<String, String> record = new ProducerRecord<> ("topic", "partitionNo", "key", "value");
これにより、分割番号をパラメータとして使用できます.

Custom Partitioner


KafkaProducerオブジェクトにCustom Partifierクラスが追加されていない場合は、Kafka 2.4.0バージョンからUniformStickyPartifierがデフォルト値で追加されます.
以前のRoundRobbin Partifierの欠点を補ったパーティション化器は、データがすべて配置にパッケージされるのを待ってから、データをすべて同じパーティションにパッケージ化する方法です.
クライアント・パートナー・クラスを作成する場合は、クライアントが提供するパートナー・インタフェースを実装することによってクライアント・クラスを実装する必要があります.その後,実装されたクラスをKafkaプロダクションオブジェクトのパラメータに入れることで,クライアントパートナーを実装することができる.
カスタム例(作成者github上のコード)
package org.example;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner  implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                         Cluster cluster) {

        if (keyBytes == null) {
            throw new InvalidRecordException("Need message key");
        }
        if (((String)key).equals("Pangyo"))
            return 0;

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }


    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}
上記のように、Customクラスを作成し、パラメータをputメソッドとして生産者クラスファイルのプロパティオブジェクトに追加します.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
参考までに、上記の例は「Pangyo」という文字列であり、キー値である場合は0番パーティションのcustomパーティションクラスに格納する必要があります.

エージェントが正常に転送されているかどうかを確認するプロデューサー


KafkaProducerのsend()メールは、完全なオブジェクトを返します.get()メソッドを使用して完全オブジェクトをRecordMetadataクラスとして受信すると、送信データの結果を同期して受信できます.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
伝送結果を非同期で受信するためには,Calbackインタフェースを実現するコールバック作成人間を記述する必要がある.

同期vs.非同期


同期は、転送結果を受信するたびに速度を遅くする可能性があります.毎回仲介者の伝送結果への応答を待つからだ.
しかし、非同期は必ずしも良いわけではない.データの順序が重要な場合は、使用できません.非同期で結果を待つ場合、次のデータ転送に成功し、以前に送信されたデータの結果が失敗すると、再転送がデータの順序を反転させる可能性があるからです.
そのため、状況に応じて使うべきです.