マイクロサービスフレームワーク的なものを目指して (Apach Kafka編)


入門した背景

  • Microservicesについて勉強した結果、何か作ってみたくなった。そこでサービス間の通信にどういう技術選定をしようかと考えていたところ、いくつか作ってみようと思い、とりあえずSpringKafka入門してみるかと思った。
  • 最終的にはSpirngBootLayer + BusinessLogicLayer + RepositoryLayerという構成のサービスを作って、BusinessLogicLayer以外はapplication.ymlか何かで設定を切り替えれば、そのサービスの通信・データストアを切り替えることが可能なフレームワークを作ってみようと思う。
  • これがリリース容易性・テスタビリティ・変更容易性を担保した仕組みになっているかと聞かれると、微妙な気がする。。
  • 既に色々欠点が考えられるが何はともあれ作ってみよう。そうすることで何か見えてくることがあるはず。まずはSpringKafkaに入門。

Apache Kafka

公式docを読んで、大体の構成・コンセプトを理解。

機能概要

  • 業務アプリケーションの扱うデータをpub/subモデルでもpullモデルでも配信できるhubのようなもの
  • 配信したデータをfault-tolerantな構成で管理(クラスタ構成)
  • データが発生したと同時に配信可能
  • 配信データ発生時のログ生成により、配信メッセージの順序が担保される
  • データの1レコードはkey,value,timestampで構成
  • topicというキューのようなものにデータを格納していく
  • topicに対して送信したデータがtopicに紐付けたConsumerGroupConsumerがデータを受信できる

SpringKafkaでメッセージ送信

apache kafkaをインストール

  • こちらの記事を参考にローカルにApache Kafkaをインストールして起動

  • どうやらzookeeperという管理サービスがいるらしい。こちらの記事zookeeperkafkaの構成がわかりやすく記載されていたので参考にさせていただきました。

Spring Kafkaからデータ送信

  • SpringInitilizerwebSpringKafkaを依存に含めて雛型作成。

  • ローカルのkafkaサーバにtopicを作成する設定をBean登録する

KafkaConfig.java
package kafka.sample.springKafka.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic userTopic() {
        return new NewTopic("user", 10, (short) 1);
    }
}
  • Apache Kafkaにデータを送信する実装をBean登録しておく
KafkaServiceTemplate.java
package kafka.sample.springKafka.template;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class KafkaServiceTemplate {
    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String key, String value) {
        template.send(key, value).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println(result);

            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println(ex.getMessage());

            }

        });
    }

}
  • 最後にRestAPIでPOSTされたユーザ名でデータを登録する実装
UserController.java
package kafka.sample.springKafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import kafka.sample.springKafka.template.KafkaServiceTemplate;

@Controller
public class UserController {
    @Autowired
    private KafkaServiceTemplate template;

    @RequestMapping(path = "/register/user", method = RequestMethod.POST)
    public String register(@RequestBody String username) {
        template.send("user", username);
        return username;
    }
}

アプリケーションを起動して、curlでPOSTをする。
その後、ローカルでインストールされたモジュールに含まれているCLIツールでアプリケーションで作成したtopicを最初からconsumeする。Apache Kafkaサーバはデフォルトではポート9092で上がってくる。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic user --from-beginning
testUser

これでデータ配信は実装完了。
簡単に実装できた。

次回はもう少し複数サーバでのpub/subとか試してみよう。