SpringBootはRabbitマルチ消費者カットピークを使用

21611 ワード

文書ディレクトリ
  • シーン
  • 構成
  • 生産者送信メッセージ
  • 消費者処理メッセージ
  • シーン
    フロントエンドシステムは大量のデータをプッシュして当方のシステムに入って処理して、当方のシステムの圧力を軽減するため、しかも十分にサーバーの性能を発揮して、処理効率を高めて、そこでRabbitを使って限流処理をして、同時にマルチスレッドが複数の消費者の処理任務を運行して、効率を高めます
    コンフィギュレーション
    Rabbitコンフィギュレーションクラス、残りのインフラストラクチャコンフィギュレーションはコンフィギュレーションファイルまたはコンフィギュレーションセンターに維持されます
    
    /***
     * Rabbit   
     * @author yanqiang.jiang
     * @version 1.0
     * @date 2019/08/26
     **/
    
    @Configuration
    @Slf4j
    public class RabbitConfig {
    
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Autowired
        private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
        /**
         *     
         *
         * @return   
         */
        @Bean
        public Queue accoflowHs() {
            return new Queue("testQueue");
        }
    
    
        /**
         *      
         *
         * @return SimpleRabbitListenerContainerFactory
         */
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setConcurrentConsumers(1);
            factory.setMaxConcurrentConsumers(1);
            factory.setPrefetchCount(1);
            factory.setTxSize(1);
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return factory;
        }
    
        /**
         *      
         *
         * @return SimpleRabbitListenerContainerFactory
         */
        @Bean(name = "multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factoryConfigurer.configure(factory, connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            //           
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //        
            factory.setConcurrentConsumers(10);
            //        
            factory.setMaxConcurrentConsumers(15);
            //      
            factory.setPrefetchCount(5);
            return factory;
        }
    
        /**
         * RabbitTemplate   
         *
         * @return RabbitTemplate
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
                        log.info("      :correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                    }
            );
            rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
                        log.info("    :exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
                    }
            );
            return rabbitTemplate;
        }
    
    
    }
    

    ここで、factory.setMessageConverter(new Jackson2JsonMessageConverter());から送信メッセージは、factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);が手動確認モードfactory.setConcurrentConsumers(10);およびfactory.setMaxConcurrentConsumers(15);を開く必要があることをシーケンス化するために使用される.factory.setPrefetchCount(5);毎回受け取るメッセージの数は、数が多くて効率的ですが、順序が保証されないほど
    生産者がメッセージを送信
    /***
     * rabbit    
     * @author yanqiang.jiang
     * @version 1.0
     * @date 2019/08/26
     **/
    @Component
    @Slf4j
    public class AccoflowHsProducer {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
         *     
         *
         * @param batchNum    
         */
        public void stringSend(String batchNum) {
            log.info("    :{},    :{}", "testQueue", batchNum);
            //                
            this.rabbitTemplate.convertAndSend("testQueue",
                    MessageBuilder.withBody(batchNum.getBytes(StandardCharsets.UTF_8)).build());
        }
    }
    

    消費者がメッセージを処理する
    /***
     * rabbit    
     * @author yanqiang.jiang
     * @version 1.0
     * @date 2019/08/26
     **/
    @Component
    @Slf4j
    public class AccoflowHsConsumer {
    
        @Autowired
        private AccoflowInfHsHandler accoflowInfHsHandler;
    
        /**
         *         
         *
         * @param msg   
         */
        @RabbitListener(queues = "testQueue" containerFactory = "multiListenerContainer")
        public void recievedString(Message msg, Channel channel) throws Exception {
            try {
                log.info("           {}    ", channel.getChannelNumber());
                PaTransactionTask task = JSON.parseObject(msg.getBody(), PaTransactionTask.class);
                log.info("           {}    :{}", channel.getChannelNumber(), task.getPlanControlId());
                //             
                //       
                runner.excuteTransaction(task);
            } catch (Exception e) {
                log.info("           {}  ", channel.getChannelNumber());
                e.printStackTrace();
            }
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
            log.info("           {}   ", channel.getChannelNumber());
        }
    }
    

    注意:channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);処理が完了すると、必ずメッセージを確認しなければならない.そうしないと、次のメッセージの処理は継続しない.また、異常の場合も手動で確認してから処理が完了してから確認するようにしてください.ここでは処理時間が長くなる可能性がありますが、このときメッセージタイムアウトサーバが消費者に再度メッセージを送信するので、ここでは重複処理を防止することをお勧めします.重複消費メッセージを防止します.