RabbitMQは浅入から門に深く入り込む全総括(二)

12952 ワード

いちばん前に書く
前回文章を出してからもうずいぶん経ちましたが、実はこの时間はずっとペンを止めていません.ただ、仕事を探したり、学校の授業を終えたりするのに忙しいだけです.ブログをやり直して、後で文章を最近更新します.
  • この文章は少し長いので、2編の
  • に分かれています.
  • PS:あのGithubでJavaの知識問答の文章も止めず、最近は
  • も続々と更新されています
    6.ステップアップ補充
    6.1有効期限設定(TTL)
    有効期限(TTL)は、メッセージまたはキューに時効を設定し、消費者に受信されて取得されるのは時間範囲内であり、有効期限を超えると自動的にメッセージが削除される.
    注意:主にメッセージの期限切れについて説明します.メッセージの期限切れの最初の方法では、キューの期限切れの設定方法についても言及します.
  • キュー属性の設定により、キュー内のすべてのメッセージに同じ期限切れ
  • がある.
  • はメッセージを個別に設定し、各メッセージTTLは
  • と異なることができる.
    2つの方法が同時に使用される場合、両者の有効期限TTLが小さい値を基準とする.メッセージのキュー生存時間が設定されたTTL値を超えると、Dead Messageと呼ばれるデッドメッセージキューに送信され、消費者はメッセージを受信できなくなります(デッドメッセージキューは次の点です).
    6.1.1すべてのメッセージに適用される有効期限
  • 構成クラス
  • @Configuration
    public class RabbitMqConfiguration {
    
        public static final String TOPIC_EXCHANGE = "topic_order_exchange";
        public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
        public static final String TOPIC_ROUTINGKEY_1 = "test.*";
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
        }
    
        @Bean
        public Queue topicQueue1() {
            //      Map   
            Map args = new HashMap<>();
            //                  5000      
            args.put("x-message-ttl", 5000);
            //         
            args.put("x-expires", 8000);
            //                  
            return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
        }
    
        @Bean
        public Binding bindingTopic1() {
            return BindingBuilder.bind(topicQueue1())
                    .to(topicExchange())
                    .with(TOPIC_ROUTINGKEY_1);
        }
    }
  • パラメータMapコンテナの作成:タイプはQueueパラメータで要求され、要求に従って作成されます.
  • メッセージの有効期限を設定します.ここで設定したメッセージの有効期限は、すべてのメッセージに適用されます.
  • キュー有効期限
  • を設定する.
  • 入力追加パラメータ:上記構成の有効期限設定をQueue入力で入力します.
  • 生産者
  • @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         *    RabbitTemplate
         */
        @Autowired
    
        @Test
        public void testTopicSendMessage() {
            rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
        }
    }

    消費者を構成しないで、Webマネージャで効果を見ることができます.
    6.1.2個別メッセージに適用される有効期限
  • の構成で最初のままで良いので、構成の有効期限が切れる
  • は不要です.
  • 生産者における構成メッセージの個別の有効期限
  • .
    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         *    RabbitTemplate
         */
        @Autowired
    
        @Test
        public void testTopicSendMessage2() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
                public Message postProcessMessage(Message message){
                    //           “5000”
                    message.getMessageProperties().setExpiration("5000");
                    message.getMessageProperties().setContentEncoding("UTF-8");
                    return message;
                }
            };
            rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
                    "This is a message 002 !",messagePostProcessor);
        }
    }

    6.2デッドメッセージキュー
    デッドレター公式の原文はDead letterで、RabbitMQのメッセージメカニズムです.メッセージを消費している間に、キューとキューのメッセージに以下の状況が発生した場合、現在のメッセージが「デッドレター」になっていることを示しています.デッドレターキューを構成すると、これらのデータが転送され、構成がなければ直接破棄されます.
  • メッセージが拒否された
  • メッセージ失効
  • キュー最大長
  • しかし、死信キューは特別な存在ではありません.私たちはスイッチを構成するだけで、消費されたキューの中で構成する必要があります.死信が現れたら、さっき構成したスイッチに再送信し、スイッチにバインドされたキューにルーティングされます.このキュー、つまり死信キューなので、作成から見ると、普通のキューと変わらないです.
    6.2.1シーンを適用
    例えば、いくつかの重要なビジネス・キューでは、正しく消費されていないメッセージは、破棄後にこれらのデータを復元するには、メンテナンス担当者がログから元のメッセージを取得し、メッセージを再送信する必要があるため、デッドライン・キューを構成し、正しく消費されていないメッセージに一時的な位置を与え、後でリカバリが必要な場合に相当します.対応するコードを書くだけでいいです.
    6.2.2実現方式
  • は、デッドラインを処理するスイッチおよびキュー
  • を定義する.
    @Configuration
    public class DeadRabbitMqConfiguration{
    
        @Bean
        public DirectExchange deadDirect(){
            return new DirectExchange("dead_direct_exchange");}
    
        @Bean
        public Queue deadQueue(){
            return new Queue("dead_direct_queue");}
        @Bean
        public Binding deadBinds(){
            return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
        }
    }
  • 通常の消費キューにおいてデッドメッセージキュー
  • を指定する.
    @Configuration
    public class RabbitMqConfiguration {
    
        public static final String TOPIC_EXCHANGE = "topic_order_exchange";
        public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
        public static final String TOPIC_ROUTINGKEY_1 = "test.*";
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
        }
    
        @Bean
        public Queue topicQueue1() {
            //       
            Map args = new HashMap<>();
            args.put("x-message-ttl", 5000);
            //          
            args.put("x-dead-letter-exchange","dead_direct_exchange");
            //          key fanout          
            args.put("x-dead-letter-routing-key","dead");
            return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
        }
    
        @Bean
        public Binding bindingTopic1() {
            return BindingBuilder.bind(topicQueue1())
                    .to(topicExchange())
                    .with(TOPIC_ROUTINGKEY_1);
        }
    }

    6.3メモリとディスクの監視
    6.3.1メモリアラームおよび制御
    サーバがメモリ不足でクラッシュしないようにするため、RabbitMQは閾値を設定し、メモリ使用量が閾値を超えると、RabbitMQはすべてのクライアントの接続を一時的にブロックし、新しいメッセージの受信を停止します.
    このしきい値を変更するには2つの方法があります
  • 命令を通す(二択一でよい)
  • コマンドは、Brokerが再起動すると
  • に無効になります.
    #                        0.6
    rabbitmqctl set_vm_memory_high_watermark 
    #                           700MB
    rabbitmqctl set_vm_memory_high_watermark absolute 
  • プロファイルrabbitmqを変更する.conf
  • プロファイルは起動するたびにロードされ、永続的に有効な
  • に属します.
    #            0.4    0.4-0.7   
    vm_memory_high_watermark.relative = 0.5
    #      
    vm_memory_high_watermark.absolute = 2GB

    6.3.2メモリスワップ
    クライアント接続とプロバイダがブロックされる前に、キュー内のメッセージをディスクに変更しようとします.この考え方は、オペレーティングシステムでは、メッセージの正常な処理を最大限に満たすためによく見られます.
    メモリのページングが発生すると、永続化または非永続化のメッセージはディスクに転送されますが、永続化されたメッセージはもともとディスクに永続化されたコピーがあるため、永続化されたメッセージは優先的に削除されます.
    デフォルトでは、メモリがしきい値の50%に達すると、ページ変更処理が行われます.
    vm_を設定することでmemory_high_watermark_paging_ratio修正
    #     1,      1       
    vm_memory_high_watermark_paging_ratio = 0.6

    6.3.3ディスク・アラート
    限りなくページを変更すると、ディスク容量が消費されてサーバがクラッシュする可能性が高いため、RabbitMQはディスクアラートのしきい値を提供し、この値を下回るとアラームが発生します.デフォルトは50 MBです.コマンドで変更できます.
    #    
    rabbitmqctl set_disk_free_limit 
    #    
    rabbitmqctl set_disk_free_limit memory_limit 

    6.4メッセージの確実な伝達
    生産者がRabbitMQにメッセージを送信する場合、ネットワークなど様々な原因で送信に失敗する可能性があるため、RabbitMQはメッセージの確実な伝達を保証する一連のメカニズムを提供し、生産者と消費者の2つの部分の処理に大別することができる.
    6.4.1生産者における仕組み
    生産者はメッセージの送信者として、自分のメッセージの送信に成功することを保証する必要があります.RabbitMQはこの点を保証する2つの方法を提供しています.-
  • confirm確認モード
  • return返却モード
  • 6.4.1.1 confirm確認モード
    生産者がメッセージを送信すると、ack応答の受信を非同期で待機し、返されたack確認メッセージを受信した後、ackがtrueかfalseかに基づいてconfirmCallbackインタフェースを呼び出して処理する
  • 構成クラス
  • spring:
      rabbitmq:
        #     
        publisher-confirm-type: correlated
  • ConfirmCallbackインタフェースを実現するconfirmメソッド
  • @Component
    public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
        /**
         * @param correlationData       
         * @param ack             exchange             。true   ,false    
         * @param cause               
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                //    
                System.out.println("          ");
            } else {
                //    
                System.out.println("          ,    : " + cause);
                // TODO          ,        
            }
        }
    }
  • キューとスイッチ
  • を宣言
    @Configuration
    public class RabbitMqConfig {
        @Bean()
        public Queue confirmTestQueue() {
            return new Queue("confirm_test_queue", true, false, false);
        }
    
        @Bean()
        public FanoutExchange confirmTestExchange() {
            return new FanoutExchange("confirm_test_exchange");
        }
    
        @Bean
        public Binding confirmTestFanoutExchangeAndQueue() {
            return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
        }
    }
  • 生産者
  • @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         *    RabbitTemplate
         */
        @Autowired
        
         /**
         *    ConfirmCallbackService
         */
        @Autowired
        private ConfirmCallbackService confirmCallbackService;
    
        @Test
        public void testConfirm() {
            //        
            rabbitTemplate.setConfirmCallback(confirmCallbackService);
            //     
            rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
        }
    }

    6.4.1.2 return返却モード
    ExchangeがQueueに送信されて失敗すると、returnsCallbackが呼び出され、このインタフェースを実装して失敗を処理することができます.
  • プロファイルでコールバック
  • の送信を開始する.
    spring:
      rabbitmq:
        #     
        publisher-returns: true
  • ReturnsCallbackを実現するreturnedMessage法
  • //  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)          
    @Component
    public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            System.out.println(returned);
        }
    }
  • キューとスイッチ(Directモード)
  • を宣言
    @Configuration
    public class RabbitMqConfig {
    
        @Bean()
        public Queue returnsTestQueue() {
            return new Queue("return_test_queue", true, false, false);
        }
    
        @Bean()
        public DirectExchange returnsTestExchange() {
            return new DirectExchange("returns_test_exchange");
        }
    
        @Bean
        public Binding returnsTestDirectExchangeAndQueue() {
            return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
        }
    }
  • 生産者
  • @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         *    RabbitTemplate
         */
        @Autowired
        
        /**
         *    ConfirmCallbackService
         */
        @Autowired
        private ConfirmCallbackService confirmCallbackService;
        
        /**
         *    ReturnCallbackService
         */
        @Autowired
        private ReturnCallbackService returnCallbackService;
    
        @Test
        public void testReturn() {
            //                    
            rabbitTemplate.setMandatory(true);
            //              
            rabbitTemplate.setReturnsCallback(returnCallbackService);
            //         
            rabbitTemplate.setConfirmCallback(confirmCallbackService);
            //     
            rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
        }
    }
  • 異なるルーティングkeyを変更すると、結果がテストされます.

  • 6.4.2消費者におけるメカニズム
    6.4.2.1 ack確認メカニズム
    ackは受信メッセージの確認を表し、デフォルトは自動確認ですが、3種類あります.
    acknowledge-modeオプションの紹介
  • auto:自動確認、デフォルトオプション
  • manual:手動確認(能力配分による手動確認が必要)
  • none:確認せず、送信後自動破棄
  • 自動確認とは、メッセージが消費者に受信されると、自動的に受信を確認し、そのメッセージをキューから削除することである.
    しかし、実際の業務処理では、正しく受信メッセージは、業務上の問題により正しく処理されない可能性があるが、手動確認方式が設定されている場合は、業務処理に成功した後にchannelを呼び出す必要がある.basicAck()は、手動で署名し、異常が発生した場合はchannelを呼び出す.basicNack()メソッドは、メッセージを自動的に再送信させる.
  • プロファイル
  • spring:
      rabbitmq:
        listener:
          simple:
              #     
            acknowledge-mode: manual 
  • 消費者
  • @Component
    @RabbitListener(queues = "confirm_test_queue")
    public class TestConsumer {
    
        @RabbitHandler
        public void processHandler(String msg, Channel channel, Message message) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                System.out.println("    : " + new String(message.getBody()));
    
                System.out.println("       :");
                int i = 66 / 0;
                
                //      deliveryTag           
                channel.basicAck(deliveryTag, true);
            } catch (Exception e) {
                //     
                channel.basicNack(deliveryTag, true, true);
            }
        }
    }

    6.5クラスタ&6.6分散トランザクション(更新対象)
    この二つの点も短くないので、簡単に書きたくないので、後ろに置いて単独の文章を書いて、発表します.
    クラスタの構築については、https://blog.csdn.net/belongh...を参照してください.