Spring Cloud Stream-ビルドメッセージイベント駆動のマイクロサービス

16815 ワード

  • は、上記を受けて、Spring Cloud Aliba RocketMQ-非同期通信を構築するためのマイクロサービス
  • を提供する.
    Spring Coud Stream概要
    Spring Coud Streamとは何ですか?
    Spring Cloud StreamはSpring Cloudのサブプロジェクトであり、MQをより便利に操作できるフレームワークであり、メッセージの中間部品に接続された高度に伸縮可能なメッセージイベント駆動のマイクロサービスを構築する目的である.
    簡単に言うとSpring Cloud StreamはMQ動作を簡略化したフレームであり、そのフレームの構図は以下の通りである.
  • 写真はオフィシャル文書から来ています.図からは、アプリケーションがinputとoutputを通じてBinderと相互作用することが分かります.BinderはMQと統合されたマイクロサービスのコンポーネントです.図中のMiddlewareはメッセージ中間のもので、現在はKafka、RabbiitMQおよびRocketMQ
  • をサポートしています.
    Spring Coud Streamプログラミングモデル:
  • ピクチャは、オフィシャルドキュメントから、マイクロサービスがStreamに統合された後、StreamのDestination Binderが2つのBindingを作成し、左のBindingがRabbiitMQに接続され、右のBindingがKafkaに接続されています.左のBindingはRabbiitMQからメッセージを消費し、図中のコードの処理後、処理結果を右のBindingでKafkaに送る.簡単に言えば、このマイクロサービスはRabbiitMQ内のメッセージを消費して処理し、最後に処理結果をKafkaに送ることになる.InputおよびOutputは、メッセージがマイクロサービスに対する方向であり、inputはマイクロサービス受信メッセージを表し、outputは、マイクロサービス配信メッセージまたはメッセージ送信
  • を表している.
    図中の概念について:
  • Destination Binder(ターゲットバインディング):メッセージ中間部と通信するコンポーネントで、メッセージの消費と配達を実現する
  • .
  • Destination Bindings(ターゲットバインディング):Bindingは、メッセージの消費と生産のために、アプリケーションとメッセージの間に接続されたブリッジであり、binderによって
  • を作成する.
    Spring Cloud Streamを使う
    現在は生産者としてのマイクロサービスプロジェクトがあります.このマイクロサービスのためにSpring Cloud Streamを統合します.第一歩はstream依存を追加します.
    
        com.alibaba.cloud
        spring-cloud-starter-stream-rocketmq
    
  • Tips:本プロジェクトのSpring Coudバージョンは、Green wich.SR 1である.Spring Cloud Alibabababaバージョンは、2.1.0.RELEASE
  • です.
    第二のステップは、起動クラスに@EnableBindingの注釈を追加し、以下の通りである.
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    
    @EnableBinding(Source.class)
    ...
    ステップ3では、配置ファイルにstreamに関する設定項目を追加します.
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.190.129:9876
          bindings:
            #     output
            output:
              #     topic
              destination: stream-test-topic
    以上のステップを完了したら、プロジェクトはもうSpring Cloud Streamに統合されました.今はSpring Coud Streamを使って生産者を作成します.具体的なコードは以下の通りです.
    package com.zj.node.contentcenter.controller.content;
    
    import lombok.RequiredArgsConstructor;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     *    
     *
     * @author 01
     * @date 2019-08-10
     **/
    @RestController
    @RequiredArgsConstructor
    public class TestProducerController {
    
        private final Source source;
    
        @GetMapping("/test-stream")
        public String testStream(){
            Message message = MessageBuilder
                    .withPayload("   ")
                    .build();
            source.output()
                    .send(message);
    
            return "send message success!";
        }
    }
    プロジェクトを起動して、このインターフェースが成功的に実行できるかどうかをテストします.
    もう一つの消費者向けのサービス項目:user-center、Spring Cloud Streamは、依存配置が同じなので、ここでは重複しませんが、配置と注釈の種類は変更が必要です.まず、以下のように構成されています.
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.190.129:9876
          bindings:
            #     input
            input:
              #     topic
              destination: stream-test-topic
              # rocketmq    group,       
              #         MQ,        
              group: binder-group
    起動クラスのコメントは以下の通りです.
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @EnableBinding(Sink.class)
    ...
    統合完了後、Spring Cloud Streamを使って消費者を作成します.具体的なコードは以下の通りです.
    package com.zj.node.usercenter.rocketmq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Service;
    
    /**
     *    
     *
     * @author 01
     * @date 2019-08-10
     **/
    @Slf4j
    @Service
    public class TestStreamConsumer {
    
        @StreamListener(Sink.INPUT)
        public void receive(String messageBody) {
            log.info("  stream     ,messageBody = {}", messageBody);
        }
    }
    コードの作成が完了したらプロジェクトを開始します.以前はすでに生産者を通じてRocketMQにメッセージを送りましたので、コンソールは受信したメッセージを出力します.
    Spring Coud Streamカスタムインターフェース
    以上の細かい点の勉強を通して、Spring Cloud Streamの基本的な使用を了解しました.上記の例から、inputは1つのtopic消費メッセージをバインドするために使用され、outputは逆に、1つのtopic配信メッセージをバインドするために使用されることが分かる.
    しかし、実際のプロジェクトでは、複数のtopicがあるかもしれません.極端なシーンでも、異なるtopicは異なるMQを使用して実現されるかもしれません.streamがデフォルトで提供しているinputとoutputは一つのtopicしか結合できませんので、この時はstreamのカスタムインターフェースで複数の「input」と「output」の結合が異なるtopicを実現する必要があります.
    以上の小節の例では、生産者がメッセージを送信する際に使用するのはSourceインターフェースのoutput方法であり、消費者がメッセージを送信する時に使用するのはSinkインターフェースのinput方法であり、起動クラスの@EnableBinding注釈に配置する必要があることが分かる.したがって、実際にはカスタムインターフェースのソースコードとこの2つのインターフェースのソースコードはほとんど一致している必要がありますが、名前が異なるだけで、使用もSourceSinkをユーザー定義のインターフェースに変更するだけです.
    次に,どのようにインターフェースをカスタマイズして使うかを簡単に実証し,前の小節の例に基づいて改造した.まず生産者であり、メッセージを送信するためのインターフェースを定義し、具体的なコードは以下の通りである.
    package com.zj.node.contentcenter.rocketmq;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     *          , stream     Source      
     *
     * @author 01
     * @date 2019-08-10
     **/
    public interface MySource {
    
        /**
         * Name of the output channel.
         */
        String MY_OUTPUT = "my-output";
    
        /**
         * @return output channel
         */
        @Output(MY_OUTPUT)
        MessageChannel output();
    }
    起動クラス@EnableBindingに、このインターフェースを追加する.
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    
    @EnableBinding({Source.class, MySource.class})
    ...
    設定ファイルに以下の構成を追加します.
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.190.129:9876
          bindings:
            #     output
            output:
              #     topic
              destination: stream-test-topic
            #     ”output“,        MySource    MY_OUTPUT     
            my-output:
              #      topic
              destination: stream-my-topic          
    生産者のコードを修正すれば次のとおりです.
    package com.zj.node.contentcenter.controller.content;
    
    import com.zj.node.contentcenter.rocketmq.MySource;
    import lombok.RequiredArgsConstructor;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     *    
     *
     * @author 01
     * @date 2019-08-03
     **/
    @RestController
    @RequiredArgsConstructor
    public class TestProducerController {
    
        private final MySource mySource;
    
        @GetMapping("/test-stream")
        public String testStream(){
            Message message = MessageBuilder
                    .withPayload("   ")
                    .build();
            mySource.output()
                    .send(message);
    
            return "send message success!";
        }
    }
    プロジェクトを起動してインターフェースにアクセスし、テストメッセージが正常に送信されますか?
    生産者を改造した後、消費者を改造し、まず消費者メッセージ用のインターフェースを定義します.具体的なコードは以下の通りです.
    package com.zj.node.usercenter.rocketmq;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     *          , stream     Sink      
     *
     * @author 01
     * @date 2019-08-10
     **/
    public interface MySink {
    
        /**
         * Input channel name.
         */
        String MY_INPUT = "my-input";
    
        /**
         * @return input channel.
         */
        @Input(MY_INPUT)
        SubscribableChannel input();
    }
    起動クラスの@EnableBindingにも、このインターフェースを追加する必要があります.
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @EnableBinding({Sink.class, MySink.class})
    ...
    設定ファイルに以下の構成を追加します.
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.190.129:9876
          bindings:
            #     input
            input:
              #     topic
              destination: stream-test-topic
              # rocketmq    group,       
              #         MQ,        
              group: binder-group
            #     ”input“,        MySink    MY_INPUT       
            my-input:
              #      topic
              destination: stream-my-topic
              group: my-group
    消費者のコードを修正すると次のようになります.
    package com.zj.node.usercenter.rocketmq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Service;
    
    /**
     *    
     *
     * @author 01
     * @date 2019-08-10
     **/
    @Slf4j
    @Service
    public class TestStreamConsumer {
    
        @StreamListener(MySink.MY_INPUT)
        public void receive(String messageBody) {
            log.info("      -   stream     ,messageBody = {}", messageBody);
        }
    }
    プロジェクトを開始します.以前はすでに生産者を通じてRocketMQにメッセージを送りましたので、コンソールは受信したメッセージを出力します.
    Spring Coud Streamのモニタ
    我々はSpring Boot Actutorコンポーネントが監視ポイントを暴露するために使用されることを知っています.多くの監視ツールはこのコンポーネントの監視ポイントに依存して監視を行う必要があります.プロジェクトがStreamとActutorを統合すると、対応する監視ポイントが暴露されます.まずプロジェクトにActutorを統合する必要があります.以下のような依存性を追加します.
    
    
        org.springframework.boot
        spring-boot-starter-actuator
    
    設定ファイルに以下の構成を追加します.
    management:
      endpoints:
        web:
          exposure:
            #         
            include: '*'
      endpoint:
        health:
          #         
          show-details: always
    アクセスhttp://127.0.0.1:{ }/actuatorは、露出したすべての監視エンドポイントを取得することができ、Streamの関連監視ポイントもその列にあり、以下の図である./actuator/bindingsエンドポイントは、bindings関連情報を調べるために使用することができる./actuator/channelsエンドポイントはchanelsの関連情報を調べるために使われますが、「input」と「output」はいわゆるチャンネルです.これらのチャンネルはtopicの抽象と考えられます./actuator/healthエンドポイントでは、binderおよびRocketMQの状態が確認され、主にMQの接続状況を確認するために使用され、そのstatusに接続されていない場合はDOWNとなる.
    Spring Cloud Stream+RocketMQ実現ビジネスメッセージ
    以前Spring Cloud Alibaba RocketMQ-非同期通信を構築するためのマイクロサービスの文末において,RocketMQのビジネスメッセージを紹介し,どのように符号化するかを実証しました.本論文でSpring Cloud Streamを学習した後、Streamと結合して、以前の事務メッセージを実現するコードを再構成する.
    まず設定ファイルを変更します.
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.190.129:9876
            bindings:
              output:
                producer:
                  #       ,    output  channel          
                  transactional: true
                  #            
                  group: tx-test-producer-group
          bindings:
            #     output
            output:
              #     topic
              destination: stream-test-topic
    次にTestProducerServiceを再構成し、具体的なコードは以下の通りである.
    package com.zj.node.contentcenter.service.test;
    
    import com.alibaba.fastjson.JSON;
    import com.zj.node.contentcenter.dao.content.NoticeMapper;
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import lombok.RequiredArgsConstructor;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.UUID;
    
    /**
     * @author 01
     * @date 2019-08-08
     **/
    @Service
    @RequiredArgsConstructor
    public class TestProducerService {
    
        private final NoticeMapper noticeMapper;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
        private final Source source;
    
        public String testSendMsg(Notice notice) {
            //     id
            String transactionId = UUID.randomUUID().toString();
            //   stream    ,            
            source.output().send(
                    MessageBuilder.withPayload("   ")
                            // header       ,      
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                            .setHeader("notice_id", notice.getId())
                            //        json,          toString        
                            .setHeader("notice", JSON.toJSONString(notice))
                            .build()
            );
    
            return "send message success";
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNotice(Integer noticeId, Notice notice) {
            Notice newNotice = new Notice();
            newNotice.setId(noticeId);
            newNotice.setContent(notice.getContent());
    
            noticeMapper.updateByPrimaryKeySelective(newNotice);
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
            updateNotice(noticeId, notice);
            //       
            rocketmqTransactionLogMapper.insertSelective(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .log("updateNotice")
                            .build()
            );
        }
    }
    最後にTestTransactionListenerを再構成し、具体的なコードは以下の通りである.
    package com.zj.node.contentcenter.rocketmq;
    
    import com.alibaba.fastjson.JSON;
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import com.zj.node.contentcenter.service.test.TestProducerService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    
    /**
     *        
     *
     * @author 01
     * @date 2019-08-08
     **/
    @Slf4j
    @RequiredArgsConstructor
    //    txProducerGroup             
    @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
    public class TestTransactionListener implements RocketMQLocalTransactionListener {
    
        private final TestProducerService service;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        /**
         *            
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            log.info("        .   id: {}", transactionId);
            Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
            //    header       json          
            Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class);
    
            try {
                //            
                service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId);
                //      MQ Server  commit  
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                log.error("          ,      ", e);
                //      MQ Server  rollback  
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        /**
         *              
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            log.warn("        .   id: {}", transactionId);
    
            //    id      
            RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .build()
            );
    
            //       id                ,               
            if (transactionLog == null) {
                log.warn("        ,       ,      .   id: {}", transactionId);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            return RocketMQLocalTransactionState.COMMIT;
        }
    }
    拡張記事:
  • Spring Cloud Streamメッセージフィルタリングを実現する3つの主要な方法
  • .
  • Spring Coud Stream異常処理
  • Spring Coud Stream総括