Spring Cloud Aliba RocketMQ-非同期通信を構築するマイクロサービス


MQ導入後のアーキテクチャの進化
Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第1张图片
MQの選択
メッセージキュー比較参照テーブル:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第2张图片
RocketMQ vs. ActiveMQ vs. Kafka: Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第3张图片
参照先:
  • メッセージキュー比較参照テーブル
  • RocketMQ vs. ActiveMQ vs. Kafka

  • CentOS 7上にRocketMQを構築
    環境要件:
  • CentOS 7.2
  • 64ビットJDK 1.8+
  • 4 G+の空きディスク容量
  • 1、RocketMQのバイナリパッケージをダウンロードします.ここでは4.5.1バージョンを使用しています.ダウンロード先は以下の通りです.
    http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
    wgetコマンドを使用してダウンロード:
    [root@study-01 ~]# cd /usr/local/src
    [root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

    2、ダウンロードした圧縮パッケージを解凍し、適切なディレクトリの下に移動する.
    [root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
    [root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1

    注意:unzipコマンドがインストールされていない場合は、yum install-y unzipのコマンドを使用してインストールします.
    3、rocketmqのルートディレクトリに入り、次のディレクトリとファイルが含まれているかどうかを確認します.
    [root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
    [root@study-01 /usr/local/rocketmq-4.5.1]# ls
    benchmark  bin  conf  lib  LICENSE  NOTICE  README.md

    4、問題がなければ、次のコマンドを使用してName Serverを起動します.
    [root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
    [1] 2448
    [root@study-01 /usr/local/rocketmq-4.5.1]# 

    5.デフォルトの9876ポートがリスニングされているかどうかを確認し、Name Serverが正常に起動したかどうかを確認します.
    [root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
    tcp6       0      0 :::9876                 :::*                    LISTEN      2454/java           
    [root@study-01 /usr/local/rocketmq-4.5.1]#

    6、Brokerを起動する:
    [root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
    [2] 2485
    [root@study-01 /usr/local/rocketmq-4.5.1]# 

    7、Brokerが起動に成功したかどうかを検証し、起動に成功した場合、次のようなログが表示されます.
    [root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
    2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]# 

    Name ServerとBrokerを停止するには、次の2つのコマンドを順に実行します.
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
    The mqbroker(2492) is running...
    Send shutdown request to mqbroker(2492) OK  #            
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
    The mqnamesrv(2454) is running...
    Send shutdown request to mqnamesrv(2454) OK  #            
    [2]+     143              nohup sh bin/mqbroker -n localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]#

    RocketMQ機能が正常であることを確認
    1、生産メッセージが正常であることを検証し、以下のコマンドを実行する.
    [root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    通常、次のような出力が表示されます.これは、メッセージを生成した後に成功したresultです.
    SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]

    2、消費メッセージが正常であることを検証し、以下のコマンドを実行する.
    [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    通常、次のような出力が表示されます.これは消費されたメッセージの内容です.
    ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]

    RocketMQコンソールの構築
    RocketMQはSpring Bootに基づいて開発されたビジュアル化コンソールを公式に提供しており、RocketMQの稼働状況を簡単に確認し、運用効率を向上させることができます.このセクションでは、RocketMQを構築するコンソールの構築方法について説明します.私たちが使用しているRocketMQバージョンは4.5.1なので、RocketMQの4.5.1バージョンに合うようにコンソールのソースコードを変更する必要があります.
    1、まずソースコードをダウンロードする必要があります.2つの方法があります.1つはgitクローンコード倉庫を使用すること、2つはrocketmq-externalsのzipパッケージを直接ダウンロードすることです.私はgit方式を使用して、以下のコマンドを実行します.
    git clone https://github.com/apache/rocketmq-externals.git

    2、コンソールコードを修正し、IDEでrocketmq-console項目を開く.下図のように:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第4张图片
    2.1、プロジェクト中のapplication.properties配置ファイルを修正する.ここでは主にリスニングポートとName Serverの接続アドレスを修正した.その他の配置項目について必要があれば、説明に従って自分で修正することができる.
    # console     ,   8080
    server.port=8011
    # Name Server     ;   ,      console ,        -    - NameSvrAddrList    
    rocketmq.config.namesrvAddr=192.168.190.129:9876

    2.2、依存を修正します.consoleプロジェクトのデフォルトで使用されているrocketmqバージョンは4.4.0で、ここで使用しているのは4.5.1と完全に互換性がないので、依存バージョンを修正して、この行を見つける必要があります.
    4.4.0

    次のように変更します.
    4.5.1

    2.3、コードを修正し、rocketmqのバージョンを修正したため、org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法のコンパイルエラーが発生するので、ここのコードを変更する必要があります.
    @Override
    public List queryMessageByTopic(String topic, final long begin, final long end) {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
        ...

    次のように変更します.
    @Override
    public List queryMessageByTopic(String topic, final long begin, final long end) {
        RPCHook rpcHook = null;
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
        ...

    3、構築をパッケージして起動し、ideaのterminalを開き、次のコマンドを実行します.
    #  rocketmq-console     
    mvn clean package -DskipTests
    
    #   jar     
    cd target
    
    #   rocketmq console
    java -jar rocketmq-console-ng-1.0.1.jar

    4、ブラウザを使ってコンソールにアクセスする.ここではポートを修正したので、アクセスアドレスはhttp://localhost:8011である.通常の状況では、Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第5张图片
    英語に慣れていなければ右上で言語を切り替えることができます:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第6张图片
    コンソールはビジュアル化されたインタフェースであり、中国語をサポートしているため、ここでは多くの説明にすぎません.公式のコンソール使用説明ドキュメントを参照してください.
  • RocketMQ使用文書
  • RocketMQ用語と概念
    ここでは基本的な用語と概念を簡単に思考ガイドにまとめました:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第7张图片
    公式ドキュメント:
  • Apache RocketMQ開発者ガイド
  • Springメッセージプログラミングモデル-作成者
    上記のセクションでRocketMQを構築した後、Springのメッセージプログラミングモデルを使用して、簡単な例を作成します.まず、プロジェクトに依存関係を追加する必要があります.
    
        org.apache.rocketmq
        rocketmq-spring-boot-starter
        2.0.3
    

    プロファイルにrocketmqを追加する構成は、次のとおりです.
    rocketmq:
      name-server: 192.168.190.129:9876
      producer:
        #   :    group
        group: test-group

    生産者のコードを作成します.ここではControllerを例に挙げます.具体的なコードは以下の通りです.
    package com.zj.node.contentcenter.controller.content;
    
    import lombok.Data;
    import lombok.RequiredArgsConstructor;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     *    
     *
     * @author 01
     * @date 2019-08-03
     **/
    @RestController
    @RequiredArgsConstructor
    public class TestProducerController {
    
        /**
         *         RocketMQ  api
         */
        private final RocketMQTemplate rocketMQTemplate;
    
        @GetMapping("/test-rocketmq/sendMsg")
        public String testSendMsg() {
            String topic = "test-topic";
            //     
            rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());
    
            return "send message success";
        }
    }
    
    @Data
    class MyMessage {
        private Integer id;
        private String name;
        private String status;
        private Date createTime;
    
        static MyMessage getInstance() {
            MyMessage message = new Message();
            message.id = 1;
            message.name = "×××";
            message.status = "default";
            message.createTime = new Date();
    
            return message;
        }
    }

    作成が完了したら、プロジェクトを開始し、このインタフェースにアクセスします:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第8张图片
    メッセージ送信に成功したら、RocketMQのコンソールで確認できます:Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第9张图片
    メッセージ・ボディは、メッセージの詳細を次のように表示できます.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第10张图片
    生産者のコードから見ると、非常に簡単で、RocketMQTemplateを1つ使うだけでオブジェクトをメッセージ体に変換してメッセージを送信することができます.実際にはRocketMQ以外にも、他のMQにも対応するTemplateがあり、以下のようになっています.
  • RocketMQ:RocketMQTemplate
  • ActiveMQ/Artemis:JmsTemplate
  • RabbitMQ:AmqpTemplate
  • Kafka:KafkaTemplate

  • Springメッセージプログラミングモデル-消費者の作成
    消費者プロジェクトでは、rocketmqの依存も追加する必要があります.
    
        org.apache.rocketmq
        rocketmq-spring-boot-starter
        2.0.3
    

    同様に、Name Serverの接続先を設定する必要があります.
    rocketmq:
      name-server: 192.168.190.129:9876

    消費者のコードを作成します.具体的なコードは以下の通りです.
    package com.zj.node.usercenter.rocketmq;
    
    import com.alibaba.fastjson.JSON;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     *       
     *
     * @author 01
     * @date 2019-08-03
     **/
    @Slf4j
    @Component
    // topic       topic  ,consumerGroup        ,      
    @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
    public class TestConsumerListener implements RocketMQListener {
    
        /**
         *                
         *
         * @param message    
         */
        @Override
        public void onMessage(MyMessage message) {
            log.info(" test-topic      ");
            log.info(JSON.toJSONString(message));
        }
    }
    
    /**
     *          
     */
    @Data
    class MyMessage {
        private Integer id;
        private String name;
        private String status;
        private Date createTime;
    }

    作成が完了したらプロジェクトを開始します.以前はキューにメッセージを送信していたので、消費者プロジェクトが起動すると、メッセージを傍受して消費することができます.コンソールは次のログを出力します.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务
    RocketMQトランザクションメッセージ
    RocketMQがトランザクション・メッセージをサポートしていることはよく知られていますが、これは多くの人がRocketMQをメッセージ・ミドルウェアとして使用することを選んだ大きな原因であり、RocketMQの大きな特定でもあります.RocketMQトランザクションメッセージの流れを下図に示します.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第11张图片
  • 画像:http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message

  • 原図は英語なので、大まかな翻訳を行いました.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务_第12张图片
    プロセスを簡単に分析します.
    1、生産者はMQサーバーに半メッセージを送信し、半メッセージは特殊なメッセージであり、このメッセージはMQサーバーに格納されるが、一時的に配達できない状態としてマークされるため、消費者はこのメッセージを消費しない
    2.半メッセージ送信に成功すると、生産者はローカル事務を実行する
    3、生産者はローカルトランザクションの実行結果に基づいて、MQサーバにcommitまたはrollbackメッセージを二次確認する.MQサーバがcommitを受信すると、メッセージの半分が配達可能状態としてマークされ、消費者は消費することができる.逆に,MQサーバがrollbackを受信するとメッセージの半分が破棄され,消費者は消費できなくなる.
    4、MQサーバーが二次確認のメッセージを受信していない場合、または生産者がローカルトランザクションの実行を一時停止した場合、MQサーバーはタイミング(デフォルト1分)で生産者にレビューメッセージを送信し、生産者のローカルトランザクションのステータスを確認します.その後、生産者は、レビューされたローカルトランザクションの実行結果に基づいて、commitまたはrollbackメッセージをMQサーバに再送信します.
    概念用語:
  • ハーフメッセージ(Half Message):一時的に消費できないメッセージで、生産者はMQサーバにメッセージを送信したが、このメッセージは「一時的に配達できない」状態とマークされ、先に格納される.消費者はこのニュースを消費しません
  • メッセージレビュー(Message Status Check):ネットワークの切断または生産者の再起動により、トランザクション・メッセージが失われる可能性のある2回目の確認.MQサーバは、メッセージが長い間半メッセージ状態であることを発見すると、メッセージの最終状態(コミットまたはロールバック)
  • を問い合わせる要求をメッセージプロバイダに送信する.
    メッセージ3:
  • Commit:トランザクションメッセージを送信し、消費者はこのメッセージ
  • を消費することができる.
  • Rollback:トランザクションメッセージをロールバックし、brokerはメッセージを削除し、消費者は
  • を消費できません.
  • UNKNOWN:brokerメッセージのステータスを確認する必要がある
  • エンコーディングによるRocketMQトランザクションメッセージの実装
    RocketMQトランザクションメッセージを実装するには、フローチャートに従っていくつかのコードを記述する必要があります.エンコーディングを開始する前に、まずデータベースにRocketMQのトランザクション・ログ・テーブルを作成し、ローカル・トランザクション・レビューの根拠として使用します.テーブル構造は次のとおりです.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务
    次に、トランザクション・メソッドとして動作するデータ・テーブルを再構築します.テーブル構造は次のとおりです.Spring Cloud Alibaba RocketMQ - 构建异步通信的微服务
    次にコードを書き始め、まずサービスを定義します.そこにはトランザクション注釈付きの方法とトランザクションメッセージを送信する方法があります.具体的なコードは以下の通りです.
    package com.zj.node.contentcenter.service.test;
    
    import com.zj.node.contentcenter.dao.content.NoticeMapper;
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import lombok.Data;
    import lombok.RequiredArgsConstructor;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * @author 01
     * @date 2019-08-08
     **/
    @Service
    @RequiredArgsConstructor
    public class TestProducerService {
    
        private final RocketMQTemplate rocketMQTemplate;
        private final NoticeMapper noticeMapper;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        public String testSendMsg(Notice notice) {
            // topic
            String topic = "test-topic";
            //          
            String txProducerGroup = "tx-test-producer-group";
            //     id
            String transactionId = UUID.randomUUID().toString();
            //      
            rocketMQTemplate.sendMessageInTransaction(
                    txProducerGroup, topic,
                    //    
                    MessageBuilder.withPayload("    ")
                            // header       ,      
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                            .setHeader("notice_id", notice.getId())
                            .build(),
                    //    executeLocalTransaction   
                    notice);
    
            return "send message success";
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNotice(Integer noticeId, Notice notice) {
            Notice newNotice = new Notice();
            newNotice.setId(noticeId);
            newNotice.setContent(notice.getContent());
    
            noticeMapper.updateByPrimaryKeySelective(newNotice);
        }
    
        @Transactional(rollbackFor = Exception.class)
        public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
            updateNotice(noticeId, notice);
            //       
            rocketmqTransactionLogMapper.insertSelective(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .log("updateNotice")
                            .build()
            );
        }
    }

    ローカルトランザクション・リスナーを実装し、トランザクション・メソッドを実行し、ローカル・トランザクション・ステータスのレビュー・メソッドを提供します.具体的なコードは以下の通りです.
    package com.zj.node.contentcenter.rocketmq;
    
    import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
    import com.zj.node.contentcenter.domain.entity.content.Notice;
    import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
    import com.zj.node.contentcenter.service.test.TestProducerService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    
    /**
     *        
     *
     * @author 01
     * @date 2019-08-08
     **/
    @Slf4j
    @RequiredArgsConstructor
    //    txProducerGroup   sendMessageInTransaction      
    @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
    public class TestTransactionListener implements RocketMQLocalTransactionListener {
    
        private final TestProducerService service;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        /**
         *            
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
                    log.info("        .   id: {}", transactionId);
                    // header       String  
            Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
    
            try {
                //            
                service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);
                //     , MQ Server  commit  
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                log.error("          ,      ", e);
                //      MQ Server  rollback  
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        /**
         *              
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            log.warn("        .   id: {}", transactionId);
    
            //    id      
            RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                    RocketmqTransactionLog.builder()
                            .transactionId(transactionId)
                            .build()
            );
    
            //       id                ,               
            if (transactionLog == null) {
                log.warn("        ,       ,      .   id: {}", transactionId);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

    これらのメソッドの実行プロセスを簡単に説明します.
    まず,TestProducerService.testSendMsgを呼び出してMQサーバに半メッセージを送信し,このメソッドではローカルトランザクションメソッドが実行されないこともコードから分かる.MQ Serverが半メッセージの受信に成功すると、生産者に受信に成功したことを通知し、その後、executeLocalTransactionのトランザクション注釈付きメソッドTestProducerServiceが呼び出され、トランザクションメソッドの実行が完了すると、MQサーバにローカルトランザクションステータスが返されます.updateNoticeWithRocketMQLogメソッドが返すトランザクション状態がexecuteLocalTransactionまたは何らかの理由で実行されていない場合、MQ Serverは二次確認メッセージを受信できず、デフォルトでは1分後に生産者にレビューメッセージを送信し、生産者がレビューメッセージを受信するとローカルトランザクションリスナーのUNKNOWNメソッドが実行されます.トランザクション・ログでテーブルのデータを記録し、トランザクションのステータスを確認して返します.
    RocketMQログ関連ピット
    rocketmqには独自のログシステムがあるため、デフォルトではSlf 4 jは使用されません.checkLocalTransactionメソッドに反映すると、このメソッドの実行中に異常が投げ出された場合、異常情報はコンソールに印刷されずrocketmq_に出力されるclient.logログファイルにあります.関連ソース:executeLocalTransactionrocketmqのログをコンソールに出力する場合は、クラスを起動するmainメソッドに次のコードを追加する必要があります.
    //  rocketmq  slf4j  
    System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");