KafkaはJavaを利用してデータの生産と消費の実例の教程を実現します。


前言
前の章でkafkaクラスタを構築する方法について述べたが、本記事ではどのように簡単にkafkaを使用するかを説明する。でも、kafkaを使う時、やはり簡単にkafkaを理解するべきです。
Kafkaの紹介
Kafkaは、消費者規模のウェブサイト内のすべての動作フローデータを処理することができる、高スループットの分散型発行購読メッセージシステムである。
Kafkaには以下のような特性があります。
  • は、時間的に複雑度がO(1)となるように、メッセージの持続性を提供し、TBレベル以上のデータに対しても定数時間の複雑さのアクセス性能を保証する。
  • 高スループット。非常に安価な商用マシンでも、毎秒100 K以上のメッセージを送信することができます。
  • は、Kafka Server間のメッセージパーティションおよび分散型消費をサポートし、各Partion内のメッセージの順序を保証しながら、送信する。
  • は、オフラインデータ処理とリアルタイムデータ処理を同時にサポートする。
  • Scale out:オンライン水平拡張をサポートします。
  • kafkaの用語
  • Broker:Kafkaクラスタは1つ以上のサーバを含み、このようなサーバはbrook erと呼ばれる。
  • Topic:Kafkaクラスタに配信された各メッセージには一つのカテゴリがあり、このカテゴリはTopicと呼ばれる。物理的に異なるTopicのメッセージは別々に記憶されています。論理的にTopicのメッセージは一つ以上のbrookに保存されていますが、ユーザはメッセージのTopicを指定するだけで、データがどこに保存されているかに関心がなく、データを生産または消費することができます。
  • Partation:Partationは物理的概念であり、各Topicは1つまたは複数のPartationを含む。
  • Producer:Kafka brookへのメッセージの発信を担当しています。
  • Consmer:メッセージ消費者は、Kafka brook erにメッセージのクライアントを読み取ります。
  • Consmer Group:各Consmerは特定のConsmer Groupに属します。
  • kafkaコアApp
    kafkaは4つの核心APIがあります。
  • アプリケーションは、producer APIを使用して、メッセージを1つ以上のtopicにリリースする。
  • アプリケーションは、consumer APIを使用して1つまたは複数のtopicを購読し、生成されたメッセージを処理する。
  • アプリケーションは、ストリームプロセッサとしてstrems APIを使用して、1つ以上のtopicから入力ストリームを消費し、出力ストリームを1つ以上のtopicに生成し、効果的に入力ストリームを出力ストリームに変換する。
  • connector APIは、重複して使用できる生産者または消費者の構築または実行を可能にし、topicを既存のアプリケーションまたはデータシステムにリンクする。
  • 例の図は以下の通りです

    kafkaの応用シーン
  • は、システムまたはアプリケーション間でデータを確実に取得できるリアルタイムストリームデータパイプを構築する。
  • は、リアルタイムストリームアプリケーションを構築し、データストリームを変換または応答することができる。
  • 以上、kafkaの公式文書を参照してください。
    開発準備
    kafkaのプログラムを開発するなら、何をすればいいですか?
    まず、kafka環境を構築した後、私達が考えるべきのは生産者ですか?それとも消費者ですか?つまりメッセージの送信者ですか?それとも受け入れ者ですか?
    しかし、本編では、生産者と消費者が開発と解説を行います。
    kafkaを大体知ってから、初めてのプログラムを開発します。
    ここで使う開発言語はJavaで、構築ツールのMavenです。
    Mavenの依存は以下の通りである。
    
     <dependency>
      <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
       <version>1.0.0</version>
       <scope>provided</scope> 
      </dependency>
      
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
      </dependency>
      
      <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-streams</artifactId>
       <version>1.0.0</version>
      </dependency>
    Kafka Prodcer
    開発生産の際に、kafkaの各種配置説明を簡単に紹介します。
  • bootstrap.servers:kafkaのアドレスです。
  • acks:メッセージの確認メカニズムは、デフォルト値は0です。
  • acks=0:0に設定されている場合、生産者はkafkaの応答を待っていません。
  • acks=1:この構成は、kafkaがこのメッセージをローカルログファイルに書き込むことを意味するが、クラスタ内の他のマシンの成功応答を待つことはない。
  • acks=all:この構成は、すべてのフォロワーが同期して完了するのを待つことを意味する。この保証メッセージは、kafkaクラスタ内の全てのマシンが切断されない限り失われない。これは一番強いユーザビリティ保証です。
  • retries:0より大きい値に設定すると、クライアントはメッセージ送信に失敗したときに再送信する。
  • batch.size:複数のメッセージが同じパーティションに送信される必要がある場合、生産者はネットワーク要求の統合を試みる。これはclientと生産者の効率を高めます。
  • key.serializer:キープロローグ、デフォルトorg.apache.kafka.com on.serialzation.StrigDeserializer。
  • value.deserializer:値序列化、デフォルトorg.apache.kafka.co.serialzation.StrigDeserializer。

  • もっと多くの配置があります。公式文書を見に行きます。ここでは説明しません。
    じゃ、私達のkafkaのプロデュースは以下のように構成されています。
    
      Properties props = new Properties();
      props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("key.serializer", StringSerializer.class.getName());
      props.put("value.serializer", StringSerializer.class.getName());
      KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    kafkaの配置が追加された後、私達はデータを生産し始めました。データコードは以下の通りであればいいです。
    
    producer.send(new ProducerRecord<String, String>(topic,key,value));
  • topic:メッセージ・キューの名前は、先にkafkaサービスで作成することができます。もしkafkaの中でtopicを創建していないならば、それでは自ら創建します!
  • key:キーの値、つまりvalueの対応する値は、Mapと似ています。
  • value:送信するデータは、Stringタイプのデータフォーマットです。
  • 生産者のプログラムを書いてから、まず生産しましょう。
    ここで送ったメッセージは:
    
     String messageStr="  ,   "+messageNo+"   ";
    そして、1000枚だけ送ったら終了します。結果は以下の通りです。

    情報の印刷に成功したのが見えます。
    プログラムで確認したくない場合は、送信に成功したかどうかとメッセージの送信精度は、kafkaサーバー上でコマンドで確認できます。
    Kafka Consmer
    kafkaの消費はこれがポイントです。結局、ほとんどの場合、私たちは主にデータを使って消費します。
    kafka消費の配置は以下の通りです。
  • bootstrap.servers:kafkaのアドレスです。
  • group.id:グループ名の異なるグループ名は重複して消費することができます。例えば、グループ名Aを使って、kafkaの1000件のデータを消費しましたが、この1000件のデータを再び消費したいです。また新たに生成したくないです。ここではグループ名を変更するだけで、繰り返し消費できます。
  • enable.aut.com mmit:自動で提出するかどうか、デフォルトはtrueです。
  • aut.com mmit.interval.ms:pollからのコールバック処理時間が長いです。
  • session.timeout.ms:タイムアウト時間。
  • max.poll.records:一回最大引き取りの本数。
  • at.offset.reet:消費規則、デフォルトのearliest。
    earliest:各パーティションの下に提出済みのoffsetがある場合、提出したoffsetから消費を開始する。提出されたオフセットがない場合は、最初から消費します。
    latest:各パーティションの下に提出済みのoffsetがある場合、提出したoffsetから消費を開始する。提出されたoffsetがない場合は、新たに生成された当該パーティションのデータが消費されます。
    none:topic各パーティションに提出済みのoffsetが存在する場合、offsetから消費を開始します。一つのパーティションが既にコミットされているoffsetが存在しない限り、例外をスローします。
  • key.serializer:キープロローグ、デフォルトorg.apache.kafka.com on.serialzation.StrigDeserializer。
  • value.deserializer:値序列化、デフォルトorg.apache.kafka.co.serialzation.StrigDeserializer。
  • じゃ、私達のkafkaのconsumer配置は以下の通りです。
    
     Properties props = new Properties();
      props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
      props.put("group.id", GROUPID);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("max.poll.records", 1000);
      props.put("auto.offset.reset", "earliest");
      props.put("key.deserializer", StringDeserializer.class.getName());
      props.put("value.deserializer", StringDeserializer.class.getName());
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    これは設定された自動提出ですので、消費コードは以下の通りです。
    私たちはまずtopicを購読します。つまり、どのtopicを消費するかを指定します。
    
    consumer.subscribe(Arrays.asList(topic));
    購読してから、私達はまたkafkaからデータを引き出します。
    
    ConsumerRecords<String, String> msgList=consumer.poll(1000);
    一般的には消費者会のモニターを使って、ここでfor(;;)を使います。モニターを行い、1000条を設定して終了します。
    結果は以下の通りです

    私達のところではすでに生産データの消費に成功しています。
    コード
    生産者と消費者のコードは以下の通りです。
    生産者:
    
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    /**
     * 
    * Title: KafkaProducerTest
    * Description: 
    * kafka    demo
    * Version:1.0.0 
    * @author pancm
    * @date 2018 1 26 
     */
    public class KafkaProducerTest implements Runnable {
    
     private final KafkaProducer<String, String> producer;
     private final String topic;
     public KafkaProducerTest(String topicName) {
      Properties props = new Properties();
      props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("key.serializer", StringSerializer.class.getName());
      props.put("value.serializer", StringSerializer.class.getName());
      this.producer = new KafkaProducer<String, String>(props);
      this.topic = topicName;
     }
    
     @Override
     public void run() {
      int messageNo = 1;
      try {
       for(;;) {
        String messageStr="  ,   "+messageNo+"   ";
        producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
        //   100    
        if(messageNo%100==0){
         System.out.println("     :" + messageStr);
        }
        //  1000    
        if(messageNo%1000==0){
         System.out.println("     "+messageNo+" ");
         break;
        }
        messageNo++;
       }
      } catch (Exception e) {
       e.printStackTrace();
      } finally {
       producer.close();
      }
     }
     
     public static void main(String args[]) {
      KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
      Thread thread = new Thread(test);
      thread.start();
     }
    }
    消費者:
    
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    
    /**
     * 
    * Title: KafkaConsumerTest
    * Description: 
    * kafka    demo
    * Version:1.0.0 
    * @author pancm
    * @date 2018 1 26 
     */
    public class KafkaConsumerTest implements Runnable {
    
     private final KafkaConsumer<String, String> consumer;
     private ConsumerRecords<String, String> msgList;
     private final String topic;
     private static final String GROUPID = "groupA";
    
     public KafkaConsumerTest(String topicName) {
      Properties props = new Properties();
      props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
      props.put("group.id", GROUPID);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("auto.offset.reset", "earliest");
      props.put("key.deserializer", StringDeserializer.class.getName());
      props.put("value.deserializer", StringDeserializer.class.getName());
      this.consumer = new KafkaConsumer<String, String>(props);
      this.topic = topicName;
      this.consumer.subscribe(Arrays.asList(topic));
     }
    
     @Override
     public void run() {
      int messageNo = 1;
      System.out.println("---------    ---------");
      try {
       for (;;) {
         msgList = consumer.poll(1000);
         if(null!=msgList&&msgList.count()>0){
         for (ConsumerRecord<String, String> record : msgList) {
          //  100     ,               
          if(messageNo%100==0){
           System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
          }
          //    1000    
          if(messageNo%1000==0){
           break;
          }
          messageNo++;
         }
        }else{ 
         Thread.sleep(1000);
        }
       }  
      } catch (InterruptedException e) {
       e.printStackTrace();
      } finally {
       consumer.close();
      }
     } 
     public static void main(String args[]) {
      KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
      Thread thread1 = new Thread(test1);
      thread1.start();
     }
    }
    注:master、slaave 1、slaave 2は自分の環境に関係マップをしたので、これはサーバーのIPに変えられます。
    もちろんプロジェクトはGithubに置いています。興味のあるものは見てもいいです。https://github.com/xuwujing/kafka ( ローカルダウンロード
    締め括りをつける
    簡単にkafkaを開発するには以下の手順が必要です。
  • kafkaサーバーの構築に成功し、起動に成功しました。
  • は、kafkaサービス情報を取得し、コード内で対応する構成を行う。
  • 構成が完了したら、kafkaのメッセージ・キューにメッセージが発生しているかどうかを傍受する。
  • は、生成されたデータをビジネスロジックで処理します。
  • kafka紹介の公式文書:http://kafka.apache.org/intro
    締め括りをつける
    以上はこの文章の全部の内容です。本文の内容は皆さんの学習や仕事に対して一定の参考となる学習価値を持っています。質問があれば、メッセージを書いて交流してください。ありがとうございます。