rabbitmqを用いてメッセージを手動で確認する場合、タイミング取得キューメッセージ実装

39675 ワード

問題の説明
最近のプロジェクトでは、データが増加し、サードパーティシステムと関連したインタラクションが必要なため、サードパーティシステムにプッシュする必要があるデータがあります.
関連業務
オンラインの稼働効率に影響を与えないという考え方に基づいて、私たちは増加したメッセージをrabbitmqに入れ、別のアプリケーションを使用して消費を取得します.データはプッシュするだけで、業務のデータは15分ほどの更新戦略があり、リアルタイム性はあまり高くありません.だから、rabbitをアクティブにリンクして消費し、データをネットワークで転送する必要があります.
そうかんぶんせき
ネットワーク上ではほぼ関連解決策が現れているが、関連データの紛失や処理、性能と効率などの関連基礎業務のワークロードを実現するため、後ずさりしている...
まだspringにはorgがあるspringframework.amqpキット、簡略化された大量の面倒>>始めましょう
rabbitに関するいくつかの概念を理解する
  • Spring RabbitMQ Channel理解
  • ミドルウェアシリーズ2 RabbitMQのメッセージ持続性、確認メカニズム、拒否、プリフェッチ数、割当ポリシー
  • topic

  • このいくつかの概念を理解した時、あなたはすでに私たちの今日のテーマに注目しているかもしれません.
    S i m p l e M e s a g e ListenerContainerコンテナを使用して消費キューリスニングを設定し、次に特定のリスニングListenerを設定してメッセージ消費の具体的なロジックの作成を行います.
    しかし、このコンテナを使用して大量に消費する方法については、公式には説明されていません.ネット上では、このS i m p l e M s s a g e ListenerContainerの大量メッセージ処理しか見つからないかもしれませんが、答えは簡単です.
    次はこの質問の答えについてcodingをします.
    解決策
    まず失敗した再試行が必要なのでspringのRepublishMessageRecoverer , , 。 を使用します
    関連構成
      1     @Bean
      2     ObjectMapper objectMapper() {
      3         ObjectMapper objectMapper = new ObjectMapper();
      4         DateFormat dateFormat = objectMapper.getDateFormat();
      5         JavaTimeModule javaTimeModule = new JavaTimeModule();
      6 
      7         SimpleModule module = new SimpleModule();
      8         module.addSerializer(new ToStringSerializer(Long.TYPE));
      9         module.addSerializer(new ToStringSerializer(Long.class));
     10         module.addSerializer(new ToStringSerializer(BigInteger.class));
     11 
     12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
     13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
     14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
     15 
     16         objectMapper.registerModule(module);
     17         objectMapper.registerModule(javaTimeModule);
     18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//            
     19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
     20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     21         return objectMapper;
     22 }
     23 
     24 
     25 
     26   @Bean
     27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
     28     return new RabbitAdmin(aConnectionFactory);
     29   }
     30 
     31   @Bean
     32   MessageConverter jacksonAmqpMessageConverter( ) {
     33     return new Jackson2JsonMessageConverter(objectMapper());
     34   }
     35 
     36 
     37   @Bean
     38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
     39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
     40     rabbitAdmin.declareQueue(queue);
     41     return queue;
     42   }
     43   @Bean
     44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
     45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
     46     rabbitAdmin.declareQueue(queue);
     47     return queue;
     48   }
     49   @Bean
     50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
     51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
     52     rabbitAdmin.declareQueue(queue);
     53     return queue;
     54   }
     55     /**
     56      *        
     57      * @return
     58      */
     59   @Bean
     60   TopicExchange controlExchange () {
     61       return new TopicExchange(Exchanges.ExangeTOPIC);
     62   }
     63 
     64 
     65     /**
     66      *       
     67      */
     68     @Bean
     69     public Queue bcwPayControlRetryQueue() {
     70         Map arguments = new HashMap<>();
     71         arguments.put("x-message-ttl", 10 * 1000);
     72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     73 //                   some-routing-key   some.exchange.name,                  routing key
     74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
     75         return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
     76     }
     77     /**
     78      *       
     79      */
     80     @Bean
     81     public Queue bcwPushControlRetryQueue() {
     82         Map arguments = new HashMap<>();
     83         arguments.put("x-message-ttl", 10 * 1000);
     84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     85 //                   some-routing-key   some.exchange.name,                  routing key
     86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
     87         return new Queue("queue_bcw@push@retry", true, false, false, arguments);
     88     }
     89     /**
     90      *       
     91      */
     92     @Bean
     93     public Queue bcwPullControlRetryQueue() {
     94         Map arguments = new HashMap<>();
     95         arguments.put("x-message-ttl", 10 * 1000);
     96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     97 //                   some-routing-key   some.exchange.name,                  routing key
     98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
     99         return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
    100     }
    101     @Bean
    102     public Binding  bcwPayControlRetryBinding() {
    103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
    104     }
    105     @Bean
    106     public Binding  bcwPushControlRetryBinding() {
    107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
    108     }
    109     @Bean
    110     public Binding   bcwPullControlRetryBinding() {
    111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
    112     }
    113 
    114   /**
    115    *         RoutingKey
    116    *
    117    * @param queueMessages     
    118    * @param exchange         
    119    * @return   
    120    */
    121   @Bean
    122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
    124   }
    125   /**
    126    *         RoutingKey
    127    *
    128    * @param queueMessages     
    129    * @param exchange         
    130    * @return   
    131    */
    132   @Bean
    133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
    134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
    135   }
    136   /**
    137    *         RoutingKey
    138    *
    139    * @param queueMessages     
    140    * @param exchange         
    141    * @return   
    142    */
    143   @Bean
    144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
    146   }
    147 
    148   @Bean
    149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
    150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
    151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
    152           ConnectionFactory connectionFactory) {
    153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    154     configurer.configure(factory, connectionFactory);
    155     factory.setMessageConverter(jacksonAmqpMessageConverter());
    156     return factory;
    157   }

    次は私たちのテーマです.タイミングタスクはorgを使用しています.springframework.scheduling
      1 /**
      2  *        ,          
      3  */
      4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
      5     protected final Logger logger = LoggerFactory.getLogger(getClass());
      6     private List body = new LinkedList<>();
      7     public long start_time;
      8     private Channel channel;
      9     @Autowired
     10     private ObjectMapper objectMapper;
     11     @Autowired
     12     private RabbitTemplate rabbitTemplate;
     13 
     14     public QuartzSimpleMessageListenerContainer() {
     15         //     
     16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
     17 
     18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
     19             long current_time = System.currentTimeMillis();
     20             int time = (int) ((current_time - start_time)/1000);
     21             logger.info("====   {}     =====",message.getMessageProperties().getConsumerQueue());
     22             Long retryCount = getRetryCount(message.getMessageProperties());
     23             if (retryCount > 3) {
     24                 logger.info("====         {}        =====",message.getMessageProperties().getConsumerQueue());
     25                 try {
     26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
     27                 } catch (IOException ex) {
     28                     ex.printStackTrace();
     29                 }
     30                 return;
     31             }
     32 
     33             this.body.add(message);
     34             /**
     35              *           ,                
     36              *                  ,         
     37              */
     38             if(body.size()>=3 || time>60){
     39                 this.channel = channel;
     40                 callback();
     41             }
     42         });
     43 
     44 
     45 
     46     }
     47     private void callback(){
     48 //         channel = getChannel(getTransactionalResourceHolder());
     49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
     50             try {
     51                 callbackWork();
     52             }catch (Exception e){
     53                 logger.error("      :{}",e.getMessage());
     54 
     55                 body.stream().forEach(message -> {
     56                     Long retryCount = getRetryCount(message.getMessageProperties());
     57                     if (retryCount <= 3) {
     58                         logger.info("           ,    :" + retryCount);
     59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
     60                     }
     61                 });
     62 
     63             } finally{
     64 
     65                 logger.info("flsher too data");
     66 
     67                 body.stream().forEach(message -> {
     68                     //  acknowledge
     69                     try {
     70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     71                     } catch (IOException e) {
     72                         logger.error("        !");
     73                         e.printStackTrace();
     74                     }
     75                 });
     76 
     77                 body.clear();
     78                 this.stop();
     79 
     80             }
     81         }
     82 
     83     }
     84     abstract void callbackWork() throws Exception;
     85     /**
     86      *         
     87      * @param properties
     88      * @return
     89      */
     90     private long getRetryCount(MessageProperties properties){
     91         long retryCount = 0L;
     92         Map header = properties.getHeaders();
     93         if(header != null && header.containsKey("x-death")){
     94             List> deaths = (List>)header.get("x-death");
     95             if(deaths.size()>0){
     96                 Map death = deaths.get(0);
     97                 retryCount = (Long)death.get("count");
     98             }
     99         }
    100         return retryCount;
    101     }
    102 
    103     @Override
    104     @Scheduled(cron = "0 0/2 * * * ? ")
    105     public void start() {
    106         logger.info("start push data scheduled!");
    107         //     ,       stop  ,   rabbit
    108         body.clear();
    109         super.stop();
    110         start_time = System.currentTimeMillis();
    111         super.start();
    112 
    113         logger.info("end push data scheduled!");
    114     }
    115 
    116     public List getBody() {
    117 
    118         List collect = body.stream().map(data -> {
    119                     byte[] body = data.getBody();
    120                     WDNJPullOrder readValue = null;
    121                     try {
    122                         readValue = objectMapper.readValue(body, new TypeReference() {
    123                         });
    124                     } catch (IOException e) {
    125                         logger.error("      {}",e.getMessage());
    126                     }
    127                     return readValue;
    128                 }
    129         ).collect(Collectors.toList());
    130 
    131         return collect;
    132 
    133 
    134     }
    135 
    136 }

     
    に続く
     
    もちろんタイミングタスクの起動は、rabbitコンテナの実装に関連することができますが、ここでは必要ありませんので、この小さな変更については、学生は自分で実現することができます
     @Scheduled(cron = "0 0/2 * * * ? ")
    
    public void start()