Spring Cloud StreamとRabitMQの統合を紹介します.


Spring Cloud Streamは、Spring BootとSpring Integrationの上に構築されたフレームワークであり、イベント駆動やメッセージ駆動のマイクロサービスの作成に役立ちます.本論文ではSpring Cloud Streamの概念と構造をいくつかの簡単な例で紹介します.
1 Maven依存
開始前に、私たちはSpring Cloud StreamとRabbiitMQメッセージの中間デバイスの依存性を追加する必要があります.
<dependency>
    <groupId>org.springframework.cloudgroupId>
    <artifactId>spring-cloud-starter-stream-rabbitartifactId>
dependency>
Junnitユニットテストに対応するために、pom.xmlファイルに追加します.
<dependency>
    <groupId>org.springframework.cloudgroupId>
    <artifactId>spring-cloud-stream-test-supportartifactId>
    <scope>testscope>
dependency>
2主要概念
マイクロサービスアーキテクチャは「スマートエンドポイントとダミーパイプ」の原則に従う.エンドポイント間の通信は、ラビットMQまたはApache Kafkaのようなメッセージ中間体によって駆動される.サービスは、これらのエンドポイントまたはチャネルを介してイベントをリリースし、通信する.
このメッセージ駆動サービスの基本的な構成例を通して、Spring Cloud Streamフレームのいくつかの主要な概念を見てみましょう.
2.1サービスクラス
Spring Cloud Streamを介して簡単なアプリケーションを確立し、Inputチャネルからメッセージを傍受し、Outputチャネルに応答する.
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}
このアプリケーションはINPUTとOUTを結合しているとコメントしました.この2つのチャネルはインターフェースProcessorで定義されている(Spring Cloud Streamデフォルト設定).すべてのチャネルは、特定のメッセージ中間またはバインディング内に配置されている.
これらの概念の定義を見てみましょう.
  • Bindings-入出力チャネルのインターフェースセットを宣言します.
  • Binder−メッセージ中間実装、例えばKafkaまたはRabbiitMQ
  • Chanel-メッセージ中間部とアプリケーション間の通信パイプライン
  • を表します.
  • Stream Listeners-beanにおけるメッセージ処理方法は、中間デバイスのMessage Coverter特定イベントにおいてオブジェクトの順序付け/逆順序化を行った後、チャネル上のメッセージ上で自動的にメッセージ処理方法を呼び出す.
  • Message Schemasは、メッセージの序文化および逆序文化のために使用され、これらのモードは、オブジェクトタイプの発展を静的に読み取ることができ、または動的にロードすることができる.
  • メッセージを指定の目的地に配信するのは、購読メッセージモードをリリースすることによって行われる.発表者はメッセージをテーマに分類し、各テーマは名称によって識別される.購読者は一つ以上のテーマに興味を示します.中間デバイスフィルタメッセージは、関心のあるテーマを購読サーバに転送します.加入者は、グループIDによって識別される加入者または消費者のグループであり、主題または主題のパーティションからメッセージが負荷バランスで配信される.
    2.2試験クラス
    テストクラスは、チャネルとの対話とメッセージのチェックを可能にするバインディング装置の実装である.上のenrichLogMessageサービスにメッセージを送り、応答にテキストが含まれているかどうか確認します.
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = MyLoggerServiceApplication.class)
    @DirtiesContext
    public class MyLoggerApplicationIntegrationTest {
        @Autowired
        private Processor pipe;
    
        @Autowired
        private MessageCollector messageCollector;
    
        @Test
        public void whenSendMessage_thenResponseShouldUpdateText() {
            pipe.input().send(MessageBuilder.withPayload(new LogMessage("This is my message")).build());
    
            Object payload = messageCollector.forChannel(pipe.output()).poll().getPayload();
    
            assertEquals("[1]: This is my message", payload.toString());
        }
    }
    2.3 RabbiitMQ構成
    私たちは工程src/main/resourceディレクトリのappication.ymlファイルにRabitMQバインドの配置を追加する必要があります.
    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: queue.log.messages
              binder: local_rabbit
              group: logMessageConsumers
            output:
              destination: queue.pretty.log.messages
              binder: local_rabbit
          binders:
            local_rabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
                    virtual-host: /
    inputバインディングは、queue.log.messagesというメッセージスイッチを使用して、outputバインディングは、queue.pretty.logs.messageというメッセージスイッチを使用しています.すべてのバインディングにはlocal_という名前が使われています.ラビットのバインド.RabbitmQスイッチやキューを事前に作成する必要はありませんので、ご注意ください.アプリケーションを実行すると、二つのスイッチが自動的に作成されます.
    3カスタムチャンネル
    上記の例では、Spring Cloudで提供されたProcessorインターフェースを使って、このインターフェースにはinputチャネルとout putチャネルがあります.
    もし私たちがいくつかの異なるチャンネルを作りたいなら、例えばinputチャンネルとoutputチャンネルを二つ作って、新しいカスタムプロセッサを作ることができます.
    public interface MyProcessor {
        String INPUT = "myInput";
    
        @Input
        SubscribableChannel myInput();
    
        @Output("myOutput")
        MessageChannel anOutput();
    
        @Output
        MessageChannel anotherOutput();
    }
    3.1サービス類
    Springはこのインターフェースの実現を提供してくれます.チャンネル名は、注釈を用いて@Output(「myOutput」)のように設定することができます.もし設定されていないなら、Springは通路名として名来を使用します.ここには三つの通路があります.myInput、myOutput、another Output.
    現在、私達はいくつかのルーティングルールを追加できます.もし受信した値が10未満なら、outputチャネルを通ります.受信した値が10以上の場合は、他のout putチャネルを通ります.
    @Autowired
    private MyProcessor processor;
    
    @StreamListener(MyProcessor.INPUT)
    public void routeValues(Integer val) {
        if (val < 10) {
            processor.anOutput().send(message(val));
        } else {
            processor.anotherOutput().send(message(val));
        }
    }
    
    private static final  Message message(T val) {
        return MessageBuilder.withPayload(val).build();
    }
    3.2試験類
    異なるメッセージを送信し、戻り値が異なるチャネルで得られるかどうかを判定する.
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
    @DirtiesContext
    public class MultipleOutputsServiceApplicationIntegrationTest {
        @Autowired
        private MyProcessor pipe;
    
        @Autowired
        private MessageCollector messageCollector;
    
        @Test
        public void whenSendMessage_thenResponseIsInAOutput() {
            whenSendMessage(1);
            thenPayloadInChannelIs(pipe.anOutput(), 1);
        }
    
        @Test
        public void whenSendMessage_thenResponseIsInAnotherOutput() {
            whenSendMessage(11);
            thenPayloadInChannelIs(pipe.anotherOutput(), 11);
        }
    
        private void whenSendMessage(Integer val) {
            pipe.myInput().send(MessageBuilder.withPayload(val).build());
        }
    
        private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
            Object payload = messageCollector.forChannel(channel).poll().getPayload();
            assertEquals(expectedValue, payload);
        }
    }
    4条件に応じて分配する.
    @Stream Listenerのコメントを使用して、ユーザーが望むメッセージをフィルタリングするためにカスタムSpEL式を使用することもできます.以下の例では、条件スケジュールを用いてメッセージを異なる出力にルーティングする.
    @Autowired
    private MyProcessor processor;
    
    @StreamListener(
      target = MyProcessor.INPUT, 
      condition = "payload < 10")
    public void routeValuesToAnOutput(Integer val) {
        processor.anOutput().send(message(val));
    }
    
    @StreamListener(
      target = MyProcessor.INPUT, 
      condition = "payload >= 10")
    public void routeValuesToAnotherOutput(Integer val) {
        processor.anotherOutput().send(message(val));
    }
    5まとめ
    本教程では、Spring Cloud Streamの主な概念を紹介し、RabitMQ上のいくつかの簡単な例を通してどのように使用するかを示した.プロジェクトコードは参照できます.https://download.csdn.net/download/peterwanghao/10412121