Spring cloud stream Kafka


Springは、メッセージングシステムSpring Cloud Streamの抽象的な実装を提供します.

Spring Cloud Stream


Spring Cloud Streamを使用するアプリケーションは、ミドルウェア(Kafka、Rabbit MQなど)と直接的な依存関係はなく、中間通信Spring Cloud Streamが提供するBinderという実装プログラムであり、いずれのミドルウェアにも強く結合することなくアプリケーションを開発することができる.

Spring Cloud Streamの典型的なアーキテクチャ



各要素については、このようにします.
Binderとは、ミドルウェアとの通信を担当するコンポーネントです.
Binding(input/output)は、ミドルウェアと通信するブリッジである.
Spring bootでSpring cloud streamを使用する場合、ミドルウェアKafkaとの使用方法について説明します.
まず2つの依存項目を追加する必要があります.
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
次に,@Configurationクラスにチャネル情報を含むインタフェース(@EnableBinding)を追加する.次の例ではSinkを示します.Spring Cloud Streamのデフォルトチャネルであるclassが追加されました.
ユーザーはSinkインタフェース(classなど)で定義されたチャネルを介してミドルウェアと通信します.
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringCloudStreamKafkaExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKafkaExampleApplication.class, args);
    }
}

@EnableBindingクラスにはどのようなインタフェースが含まれますか?


Spring Cloud Streamには、デフォルトで次の3つのインタフェースがあります.
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

public interface Processor extends Source, Sink {
}
上のコードではSinkはINPUTを表し,SourceはOUTPUTを表し,最後にProcessorはこの2つの部分を含む.ここでINPUTは消費者の視点から購読するTOPIC名を表し,OUTPUTは生産者の視点から公開するTOPIC名を表す.
ユーザは、上記のチャネルを以下のように定義することができる.
public interface ProcessMessage {
    String SEND_MESSAGE = "send-message";
    String RECIVE_MESSAGE = "recive-message";

    @Output(SEND_MESSAGE)
    MessageChannel sendMessage();

    @Input(RECIVE_MESSAGE)
    SubscribableChannel getMessage();
}


@EnableBinding(ProcessMessage.class)
@SpringBootApplication
public class SpringCloudStreamKafkaExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKafkaExampleApplication.class, args);
    }
}
最後に、INPUT、OUTPUTチャネルを設定すると、アプリケーションは次のようになります.変更yml
Spring cloud stream Kafkaを使用して基本設定を完了します.
(次のアプリケーション.ymlはINPUT(Consumer)のみ設定)
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost			# 브로커 IP
          defaultBrokerPort: 9092		# 브로커 포트
          consumer-properties:
		…
      bindings:
        recive-message: 				# INPUT
          destination: test				# 토픽명
          group: myTestGroup			# Group Id