【初心者向け】Kafka入門編


皆さん、こんにちは!
最近、プロジェクトにおいて、Kafkaという技術を活用したため、今日は本記事でサンプルを作りながら解説してみましょう。

一、Kafkaとは

Kafkaは大規模なストリームデータを扱うことができるオープンソースの分散メッセージングシステムです。レコードのストリームをリアルタイムで公開、サブスクライブ、保存、処理できます。

二、Kafkaの仕組み

Kafkaには4つのコアAPIが存在します。

API 概要
Producer API 送信側のアプリケーションやDBから1つ又は、複数のTopicとしてのストリームデータの配信を許可します。
Consumer API 受信側に1つ又は、複数のTopicとしてのストリームデータの受信と処理を許可します。
Steams API ストリーム プロセッサとして機能します
Connector API Kafka トピックを既存のアプリケーションにリンクする再利用可能なプロデューサーまたはコンシューマー接続を構築することもできます。

KafkaはPublish-Subscribeメッセージモデルを利用して動作します。基本的に下記の通りです。

要素  説明
Producer メッセージの送信元
Consumer  メッセージの配信先
Broker メッセージの収集・配信役

全体的なモデルは下記のイメージを見て理解できると思います。

三、環境構築

1.インストールJava

ここは略です。

2.インストールKafka

このリンクから、Kafkaをダウンロードしてローカルにおいてください。
例:

3.インストールZookeeper

KafkaはZookeeperを依頼するので、必ず予めzookeeperをインストールし、起動します。
ただ、上記の「 2.インストールKafka」でインストールされたKafkaはzookeeperを付いたので、本記事は別途でzookeeperをインストールしない。

四、KafkaとZookeeperの起動

Kafkaを起動するように、必ずzookeeperを予め起動してください。
先ずはCMDを開いて\bin\windowsのしたに入ってください。

  • zookeeperは下記のコマンドで起動してください。(zookeeper.propertiesのパスに合わせてください。)

zookeeper-server-start.bat C:\software\kafka_2.12-2.6.0\config\zookeeper.properties

  • Kafkaは下記のコマンドで起動してください。(server.propertiesのパスに合わせてください。)

kafka-server-start.bat C:\software\kafka_2.12-2.6.0\config\server.properties

KafkaとZookeeper両方とも、無事に起動したら、下記のコマンドでKafkaのTopicを作成します。
本記事はTopicがtest-topic01となります。

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic01

五、サンプルを実装

1.先ず、Mavenプロジェクトを作って下記の依頼をpom.xmlに追加します。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2.Producter クラスを作る

public class TestProducer {
    public static void main(String[] args) throws  Exception{
        Properties properties = new Properties();
        //Kafkaのホストとポートを設定する
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        //key Serialize
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        //value Serialize
        properties.setProperty("value.serializer",StringSerializer.class.getName());
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // Messageを作る。
        // messageのkey:testKey,messageのvalue:hello, this is Kafka producer!
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("test-topic01",0,"testKey","hello, this is Kafka producer!");

        // ログ出力
        System.out.println("kafka producer start....");

        Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);

        // ログ出力
        System.out.println("producer send data:" + stringStringProducerRecord.value());
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

}

3.Consumerクラスを作る

public class TestConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // Kafkaのホストとポートを設定する
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer",StringDeserializer.class.getName());
        properties.setProperty("group.id","1111");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // topicを設定する
        consumer.subscribe(Collections.singletonList("test-topic01"));

       // ログ出力
        System.out.println("kafka consumer start...");
        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(500);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                // Producerから取得されたデータを出力する
                System.out.println("Consumer get data from kafka producer:" + stringStringConsumerRecord.value());
            }
        }
    }

}

ここまで、Kafkaサンプルを作り終わりました。

六、動作確認

先ず、TestConsumerクラスをjava applicationの形で起動しておいてください。
下記のログ出力したら、OKです。

kafka consumer start...

後で、TestProducerクラスをjava applicationの形で起動してください。
正常に起動したら、TestProducerはメッセージを作ってKafkaまで渡して、
TestConsumerはKafkaからTestProducerが作ったメッセージを取得して出力します。
流れは基本的にこんな感じです。
じゃあ、TestConsumerの出力ログを確認しましょう。

上記のイメージを見ると、サンプルは正常に動けることがわかりました。

七、最後に

最後まで読んでいただき、ありがとうございます。