Spring-boot-route(十三)統合RabbitMQ


この記事はSpringBoot統合メッセージキューの最初の記事で、メッセージキューに関する内容を詳しく説明します.
メッセージキューの概要
1.メッセージキューとはMQ(Message Quene):典型的な生産者および消費者モデルによって、生産者はメッセージキューにメッセージを生成し続け、消費者はキューからメッセージを取得し続ける.生産者と消費者は非同期であり、生産者はメッセージの送信だけに関心を持ち、消費者はメッセージの受信だけに関心を持ち、ビジネスロジックの侵入がなく、簡単にビジネスのデカップリングを実現するためである.
2.メッセージ・キューの使用方法
  • 非同期処理
  • シーンの説明:あるデパートは登録機能を持っていて、登録する時にメールの検証コードを送る必要があります.
    従来の方法は、ユーザーがユーザーサービスに情報を提出し、ユーザーサービスがメールサービスを呼び出してメールを送信し、ユーザーに応答を返すことであり、これは同期的な処理方式であり、時間がかかる.メッセージキューに参加すると,ユーザは直接ユーザサービスに情報を送信し,メッセージキューに情報を書き込み,直接ユーザに応答を返し,メールサービスはメッセージキューからメッセージを読み出してメールを送信する.
  • 応用デカップリング
  • シーンの説明:あるデパートの注文の流れ.
    従来の方法はユーザーが注文し、注文システムは在庫システムを照会し、在庫システムがダウンした場合、注文に失敗し、注文量を損失する.メッセージキューに参加すると、ユーザーが注文し、注文システムが注文を記録し、注文情報をメッセージキューに書き込み、注文に成功し、在庫システムが正常に戻った後、データベース在庫を操作します(在庫が0の場合は考慮しません).このように受注システムと在庫システムは緩やかな結合の目的を達成した.
  • 流量カットピーク
  • シーンの説明:秒殺活動.
    トラフィックが大きすぎると、応答がタイムアウトしたり、システムがダウンしたりして、メッセージキューに参加したり、ユーザーの秒殺要求がメッセージキューに書き込まれたり、メッセージキューの長さなどの属性を設定し、メッセージキューの最大長に達したら、直接秒殺に失敗したことに戻り、メッセージキューのデータを消費して秒殺を完了します.
    RabbitMQの概要
    RabbitMQはErlang言語で記述され,高度なメッセージキュープロトコル(AMQP)のメッセージミドルウェアを実現した.
    1.AMQPプロトコル概念AMQP:AMQPは、ネットワーク交換のデータフォーマットを直接定義するリンクプロトコルであり、AMQPを実現したprovider自体がプラットフォーム間である.AMQPプロトコルモデルを次に示します.
  • server-brokerとも呼ばれ、クライアントのリンクを受信し、amqpエンティティサービスを実現する.
  • Connection-リンク、アプリケーションとbrokerのネットワークリンク.
  • channel−ネットワークチャネルは、ほとんどの動作がchannelで行われ、データのフローはchannel上で行われる.channelはメッセージの読み書きを行うチャネルです.クライアントは、セッションタスクを表す複数のチャネルを確立することができます.
  • message-サーバとアプリケーション間で転送されるデータ.propertiesとbodyからなる.propertiesは、メッセージのアップグレード、遅延などの高度な特性など、メッセージを修飾することができます.bodyはメッセージ体の内容です.
  • virtual host-仮想ホストは、論理分離、最上位レベルのメッセージルーティングを行うために使用され、1つの仮想アドレスに複数のスイッチがあることができます.Exchangeとメッセージキューmessage quene.
  • exchange-スイッチは、メッセージを受信し、ルータに従ってバインドされたキューにメッセージを転送します.
  • binding-バインド、スイッチとキュー間の仮想リンク、バインドにrouting keyを含めることができます.
  • routing key-仮想マシンがjiekyiが特定のメッセージをどのようにルーティングするかを決定するために使用できるルーティングルール.
  • quene-メッセージキュー.メッセージを保存し、消費者に転送します.

  • 2.RabbitMQのメッセージモデル
    1.単純モデル
    上の図では、
  • p:生成者
  • C:消費者
  • 赤色部分:quene、メッセージキュー
  • 2.ワークモデル
    上の図では、
  • p:生成者
  • C 1、C 2:消費者
  • 赤色部分:quene、メッセージキュー
  • メッセージ処理に時間がかかると、生産メッセージの速度が消費メッセージの速度よりはるかに大きくなり、メッセージの蓄積が発生し、タイムリーに処理できない.この場合、複数の消費者に1つのキューをバインドしてメッセージを消費させることができ、キュー内のメッセージは消費されると失われるため、タスクは繰り返し実行されません.
    3.ブロードキャストモデル(fanout)
    このモデルでは,生産者が送信するメッセージはすべての消費者が消費できる.
    上の図では、
  • p:生成者
  • X:スイッチ
  • C 1、C 2:消費者
  • 赤色部分:quene、メッセージキュー
  • 4.ルーティングモデル(routing)
    このモデル消費者が送信するメッセージは、異なるタイプのメッセージが異なる消費者によって消費されることができる.
    上の図では、
  • p:生成者
  • X:スイッチは、生産者のメッセージを受信するとrouting keyが完全に一致するキュー
  • にメッセージを送信する.
  • C 1、C 2:消費者
  • 赤色部分:quene、メッセージキュー
  • 5.モデルの購読(topic)
    このモデルはdirectモデルと同様にrouting keyに基づいてメッセージを異なるキューにルーティングすることができるが、このモデルはキューがrouting keyをバインドするときにワイルドカードを使用することができる.このタイプのrouting keyはいずれも1つ以上の単語からなり、複数の単語の間を.で分割する.
    ワイルドカードの説明:*:単語が1つしか一致しません#:1つ以上の単語を一致させる
    6.RPCモデル
    このモードでは、リモートコンピュータに機能の実行を通知し、実行結果の戻るのを待つ必要があります.この過程はブロックされている.
    クライアントが起動すると、匿名の排他コールバックキューが作成されます.RPC要求を送信し、RPC演算の結果が受信されるまでブロックするcallという名前の関数を提供します.
    Spring Boot統合RabbitMQ
    ステップ1:pom依存性の導入
    
        org.springframework.boot
        spring-boot-starter-amqp
    

    ステップ2:RabbitMQサービス構成情報の追加
    spring:
      rabbitmq:
        virtual-host: javatrip
        port: 5672
        host: 127.0.0.1
        username: guest
        password: guest

    ここでは、ブロードキャストモデルを例に挙げて使用します.ブロードキャストモデル(fanout)は理解しやすいです.公衆番号のように、私は毎日文章を押した後、関心のあるユーザーにプッシュします.彼らはこのメッセージを見ることができます.
    ブロードキャストモデルの注意点:
  • は、複数のキュー
  • を有することができる.
  • 各キューはスイッチ
  • をバインドする必要がある
  • 消費者ごとに独自のキューがある
  • スイッチは、バインドされたすべてのキュー
  • にメッセージを送信する.
    1.2つのキューを定義
    @Configuration
    public class RabbitConfig {
    
        final static String queueNameA = "first-queue";
        final static String queueNameB = "second-queue";
    
        /***
         *       ,      
         * @return
         */
        @Bean("queueA")
        public Queue queueA(){
    
            Map map = new HashMap<>();
            //       ,10   
            map.put("x-message-ttl",10000);
            //          ,10 
            map.put("x-max-length",10);
            //      ,    
            //      ,durable:   
            //      ,exclusive:   ,
            //      ,autoDelete:    
            Queue queue = new Queue(queueNameA,true,false,false,map);
            return queue;
        }
        
        @Bean("queueB")
        public Queue queueB(){
    
            Map map = new HashMap<>();
            //       ,10   
            map.put("x-message-ttl",10000);
            //          ,10 
            map.put("x-max-length",10);
            //      ,    
            //      ,durable:   
            //      ,exclusive:   ,
            //      ,autoDelete:    
            Queue queue = new Queue(queueNameB,true,false,false,map);
            return queue;
        }
    }
    

    2.セクタスイッチの定義
    @Bean
    public FanoutExchange fanoutExchange(){
    
        //      ,     
        //      ,durable,     
        //      ,autoDelete,      
        FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
        return fanoutExchange;
    }

    3.スイッチとキューバインディング
    @Bean
    public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
        Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
        return binding;
    }
    
    @Bean
    public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
        Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
        return binding;
    }

    4.2つの消費者がそれぞれ2つのキューを傍受するように作成
    @RabbitListener(queues = RabbitConfig.queueNameA)
    @Component
    @Slf4j
    public class ConsumerA {
    
        @RabbitHandler
        public void receive(String message){
            log.info("   A      :"+message);
        }
    }
    @RabbitListener(queues = RabbitConfig.queueNameB)
    @Component
    @Slf4j
    public class ConsumerB {
    
        @RabbitHandler
        public void receive(String message){
            log.info("   B      :"+message);
        }
    }

    5.生産者生産メッセージの作成
    @RestController
    public class provider {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("send")
        public void sendMessage(){
    
            String message = "  ,  Java  ";
            rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
        }
    }

    これにより、生産者が1つのメッセージを送信すると、2人の消費者が同時にメッセージを消費することができます.
    本明細書のサンプルコードはgithubにアップロードされ、starポイントでサポートされています.
    Spring Bootシリーズチュートリアルディレクトリ
    Spring-boot-route(一)Controller受信パラメータのいくつかの方式
    Spring-boot-route(二)プロファイルを読み取るいくつかの方法
    Spring-boot-route(3)マルチファイルアップロードを実現
    Spring-boot-route(四)グローバル異常処理
    Spring-boot-route(5)swagger生成インタフェースドキュメントの統合
    Spring-boot-route(六)統合JApiDocs生成インタフェースドキュメント
    Spring-boot-route(7)jdbcTemplate操作データベースの統合
    Spring-boot-route(8)mybatisオペレーションデータベースの統合
    Spring-boot-route(九)JPA操作データベースの統合
    Spring-boot-route(10)マルチデータソース切替
    Spring-boot-route(十一)データベース構成情報暗号化
    Spring-boot-route(12)キャッシュとしてredisを統合
    Spring-boot-route(十三)統合RabbitMQ
    Spring-boot-route(14)統合Kafka
    Spring-boot-route(15)統合RocketMQ
    Spring-boot-route(16)logbackを使用した本番ログファイル
    Spring-boot-route(十七)aopを使用して操作ログを記録する
    Spring-boot-route(18)spring-boot-adtuatorモニタリング応用
    Spring-boot-route(19)spring-boot-adminモニタリングサービス
    Spring-boot-route(20)Spring Task簡単なタイミングタスクを実現
    Spring-boot-route(21)quartz動的タイミングタスクの実現
    Spring-boot-route(二十二)メール送信機能を実現
    Spring-boot-route(二十三)微信公衆番号の開発
    このシリーズの文章はすべて仕事の中で頻繁に使う知識で、このシリーズを学んで、日常の開発に対処するのは余裕があります.他にも知りたいことがあれば、下のQRコードをスキャンして教えてください.私はこのシリーズの文章をさらに改善します.