九、springbootはrabbiitMQを整合する.


springboot整合rabbiitMQ
概要
  • ラビットMQは、最も広く展開されているオープンソースメッセージエージェントである.
  • ラビットMQライト級で、内部と雲の中に配置しやすい.多くのメッセージ伝達プロトコルをサポートします.RabbiitMQは、大規模で利用可能性の高い要件を満たすために配置され得る.
  • ラビットMQの取り付け
    ブログではLinuxシステムとWindowsシステムのインストールを紹介しています.
  • Linux Linuxにrabitmq 3.7.8をインストールします.
  • win Windows下のRabbiitMQインストール及び注意事項
  • 準備工作
  • 1 pom.xml jar導入:
  •     
            org.springframework.boot
            spring-boot-starter-amqp
        
    
  • appration.ymlファイル構成(ローカルインストールであれば、デフォルトのパスワードポートを使用して、この構成は無視できます)
  • spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    
  • メッセージ受信者Receiver
  • @Component
    public class Receiver {
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        public void receiveMessage(String message) {
            System.out.println("Received ");
            latch.countDown();
        }
    
        public CountDownLatch getLatch() {
            return latch;
        }
    
    }
    
  • 4ラビットMqConfigファイルを書く
  • @Configuration
    public class RabbitMqConfig {
        final static String queueName = "spring-boot";
    
        @Bean
        Queue queue() {
            return new Queue(queueName, false);
        }
    
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("spring-boot-exchange");
        }
    
        @Bean
        Binding binding(Queue queue, TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(queueName);
        }
    
        @Bean
        SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                 MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName);
            container.setMessageListener(listenerAdapter);
            return container;
        }
    
        @Bean
        MessageListenerAdapter listenerAdapter(Receiver receiver) {
            return new MessageListenerAdapter(receiver, "receiveMessage");
        }
    }
    
  • メッセージ送信者Runner
  • @Component
    public class Runner implements CommandLineRunner {
        final static String queueName = "spring-boot";
        private final RabbitTemplate rabbitTemplate;
        private final Receiver receiver;
        private final ConfigurableApplicationContext context;
    
        public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
                      ConfigurableApplicationContext context) {
            this.receiver = receiver;
            this.rabbitTemplate = rabbitTemplate;
            this.context = context;
        }
    
        @Override
        public void run(String... args) throws Exception {
            System.out.println("Sending message...");
            rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
            receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
            context.close();
        }
    }
    
    
  • プロジェクトの運行は以下の結果を見ました.
    Sending message...
    Received 
    
    
    これは簡単なspringboot rabbiitMQ統合です.もちろん、rabitMQのサポートはそれだけではなく、遅延行列も提供しています.
    遅延列とは何ですか?遅延列はどのような場面に適用されますか?
    遅延列は名前の通り、すぐに消費する必要はないというメッセージではなく、しばらく待ってから消費を取り出す.なぜ消費を遅らせる必要がありますか?次のような場面を見に来ました.
  • オンラインショッピングセンターで注文してから30分後に支払いが完了しませんでした.注文をキャンセルします.
  • システムは予約を作成した後、予約時間の1時間前に予約された双方の参加を通知する必要があります.
  • システムの業務が失敗したら、再試行が必要です.
    Rabbitmq実施遅延行列は、一般的に2つの形態がある.
  • 第1の方法:2つの特性を利用する:Time To Live(TTL)、Dead Letter Exchanges(DLX)
  • 第2の方法:rabitmqにおけるプラグインx-delay-message
  • を利用する.
    TTL DLXを利用して遅延列を実現する方式
    TTL DXとは何ですか
  • TTL RabbirtMQは、キューに対してx-expiresを設定してもよく、タイムアウト(両方とも最初の満了時間を基準として設定しても良い)、またはメッセージに対してx-message-ttlを設定してもよく、メッセージの生存時間を制御してもよい.
  • Dead Letter Exchange(DLX)RabbiitMQのQueは、x-dead-letter-exchangeとx-dead-letter-routing-keyの2つのパラメータを設定することができ、列内にdead letterがあれば、この2つのパラメータに従って指定されたキューに再ルーティングする.x-dead-letter-exchange:dead letterが現れたら、dead letterを指定されたexchange x-dead-letter-routing-keyに再送信します.dead letterが現れたら、dead letterを再度指定されたrouting-keyに従って
  • を送ります.
    第一の方式を実現して直接コードを貼ります.
  • は、Rabbiit MqConfig(queueとexchangeとの間のbinding関係を初期化する)
  • を作成する.
    @Configuration
    public class RabbitMqConfig {
    
        /**
         *           
         * @return
         */
        @Bean
        public Queue immediateQueue() {
            //          queue   ,             
            return new Queue(Constants.IMMEDIATE_QUEUE, true);
        }
    
        /**
         *         
         * @return
         */
        @Bean
        public Queue delayQueue() {
            Map params = new HashMap<>();
            // x-dead-letter-exchange              DLX  ,
            params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
            // x-dead-letter-routing-key                routing-key   。
            params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
            return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
        }
    
        @Bean
        public DirectExchange immediateExchange() {
            //          :
            //    ,    exchange   ,
            //    ,   exchange  ,       ,        ,
            //    ,           Map,Map        exchange    
            return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
        }
    
        @Bean
        public DirectExchange deadLetterExchange() {
            //          ,    exchange   ,    ,   exchange  ,       ,        ,
            //              Map,Map        exchange    
            return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
        }
    
        /**
         *               exchange     
         * @return
         */
        @Bean
        public Binding immediateBinding() {
            return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY);
        }
    
        /**
         *               exchange     
         * @return
         */
        @Bean
        public Binding delayBinding() {
            return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY);
        }
    }
    
    
  • 生産者
  • @Component
    public class Sender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * Description :       
         * Group :
         *
         * @author honghh
         * @date  2019/3/8 0008 18:03
         * @param msg
         * @param delayTime
         */
        public void send(String msg, int delayTime) {
            System.out.println("delayTime:[ms]" + delayTime);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            this.rabbitTemplate.convertAndSend(
                    Constants.DEAD_LETTER_EXCHANGE,
                    Constants.DELAY_ROUTING_KEY,
                    msg,
                    message -> {
                message.getMessageProperties().setExpiration(delayTime + "");
                System.out.println(sdf.format(new Date()) + "     .");
                return message;
            });
        }
    }
    
  • 消費者Receiver
  • @Component
    @EnableRabbit
    @Configuration
    public class Receiver {
    
        @RabbitListener(queues = Constants.IMMEDIATE_QUEUE)
        @RabbitHandler
        public void get(String msg) {
            System.out.println(new Date() + "        : " +  msg);
        }
    }
    
  • 試験類
  •  @Autowired
        Sender sender;
    
        @Test
        public void contextLoads() {
            sender.send("msg--send", 10000);
        }
    
    
  • 起動項目はコンソールの印刷効果を見ます.
    delayTime:[ms]10000
    2019-03-08 18:30:40     .
    
    
    
    Fri Mar 08 18:30:50 CST 2019        : msg--send
    
    ##            
    
    コード取得
    https://gitee.com/honghh/boot-demo.git
    参考文献
    https://spring.io/guides/gs/messaging-rabbitmq/