Springboot 2 Rabbitmqメッセージ確認メカニズム、べき乗等性、再試行メカニズムの統合


1.メッセージ確認メカニズム
友情はこの文章が前の文章に続いていることを示しています.接続:https://blog.csdn.net/qq_41085151/article/details/107102962
質問1:メッセージを送信するときに消費者に異常が発生すると、あなたが傍受しているメッセージは常に消費を循環します.たとえば
一.プロバイダ:
@Component
public class HelloProvider{

    @Autowired
    private RabbitTemplate rabbitTemplate1;

    public void send2(User user) {
        JSONObject jsonObject=new JSONObject();
        String userjson = jsonObject.toJSONString(user);
        Message message = MessageBuilder.withBody(userjson.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").build();
        rabbitTemplate1.convertAndSend("directExchange","direct", message);
    }
    
}

二.消費者:
@Component
public class HelloConsumer {

    @RabbitListener(queues = "direct")
    @RabbitHandler
    public void process(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String name = jsonObject.getString("name");
        System.out.println(jsonObject);
        int a = 1 / 0;
    }
}
 
      pom
        
            com.alibaba
            fastjson
            1.2.28
        

制御層メソッドでプロバイダのsend 2()メソッドを呼び出すと、コンソールは、rabbitmqのメッセージ確認メカニズムが送信消者(Producer)発見メッセージがスイッチに送信されているため、消費者がこのメッセージを消費しているかどうかを判断し、ackメッセージの確認をオンにしていない場合、rabbitmqはこのメッセージが消費されていないと判断し、メッセージを再びキューに入れる.再び消費させ、死の循環を形成します.
三.プロファイルに設定消費者側手動ack確認を追加する
server.port=8889

spring.rabbitmq.host=192.168.221.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=zl
spring.rabbitmq.password=123
#        
spring.rabbitmq.publisher-confirms=true
#            
spring.rabbitmq.publisher-returns=true

#    true                        return  ,       
spring.rabbitmq.template.mandatory=true

spring.rabbitmq.connection-timeout=15000
#         
spring.rabbitmq.virtual-host=/

#        ack   none     auto      manual    
spring.rabbitmq.listener.simple.acknowledge-mode=manual

四.消費者側での構成の手動決定
 @RabbitListener(queues = "direct")
    @RabbitHandler
    public void process(Message message, Channel channel) throws Exception {
        String  msg = new String(message.getBody());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String name = jsonObject.getString("name");
        System.out.println(jsonObject);
        try {
            int a=1/0;
     /**
     *       :
* channel.basicAck(deliveryTag, false);
* deliveryTag: index
* multiple: .true: ack deliveryTag
*/ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // /** * :
* channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ;
* deliveryTag: index
* multiple: .true: deliveryTag 。
* requeue:
*/ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); e.printStackTrace(); } }

解析:一般的に、タイプ変換の例外など、コードに発生する異常がある場合は、コード修正を変更する必要があります.実際のビジネスシーンでは
システムのインタフェースが空を返すと、処理ができます.一般的なビジネスシーンでは、メッセージ、すなわちコードを簡単に失うことはありません.
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

最後のパラメータrequeueは一般的にtrueであり,今回はデータが呼び出されず,このメッセージをキューに戻して消費し,コードにint a=1/0が現れた場合,デッドサイクルをもたらす.
2.メッセージ再試行メカニズム
手動ackをオンにしたときに再消費側が消費時に異常が発生しても循環消費を招くため、メッセージ再試行メカニズムを起動するには、デフォルトでは3回再試行してメッセージを消費し、消費が完了していない場合はメッセージを破棄(削除)したり、デッドラインキューに入れたり、手動補償を行ったりします.  
一.プロファイル
server.port=8889

spring.rabbitmq.host=192.168.221.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=zl
spring.rabbitmq.password=123
#        
spring.rabbitmq.publisher-confirms=true
#            
spring.rabbitmq.publisher-returns=true

#    true                        return  ,       
spring.rabbitmq.template.mandatory=true

spring.rabbitmq.connection-timeout=15000
#         
spring.rabbitmq.virtual-host=/

#        ack   none     auto      manual    
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#       
spring.rabbitmq.listener.simple.concurrency=1
#       
spring.rabbitmq.listener.simple.max-concurrency=1

#         ( false        ,                  )
spring.rabbitmq.listener.simple.retry.enabled=true
#    5
spring.rabbitmq.listener.simple.retry.max-attempts=5
#      
spring.rabbitmq.listener.simple.retry.initial-interval=5000

#                 (false                     )
spring.rabbitmq.listener.simple.default-requeue-rejected=true

#             ,           (unack     )
spring.rabbitmq.listener.simple.prefetch=2

再試行メカニズムをトリガするには、try/catchが異常を捕捉することなく、消費者が異常を放出する必要があります.そうしないと、デッドサイクルになります.
二.消費者コード

public class HelloConsumer {
   
    @RabbitListener(queues = "direct")
    @RabbitHandler
    public void process(Message message, Channel channel) throws Exception {
        String  msg = new String(message.getBody());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String name = jsonObject.getString("name");
        System.out.println(jsonObject);

            if ("  ".equals(name)) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }else
            {
                System.out.println("    !");
                throw  new RuntimeException("        !");
            }
        } 
    }

コンソール
{"pass":"123456","name":"  "}
    !
{"pass":"123456","name":"  "}
    !
{"pass":"123456","name":"  "}
    !
{"pass":"123456","name":"  "}
    !
{"pass":"123456","name":"  "}
    !

5回再試行した後、メッセージを削除しました.具体的には、消費が完了していない場合は、メッセージを破棄(削除)したり、デッドラインキューに入れたり、手動で補償したりします. 
3.メッセージのべき乗等性
Rabbitmqにおけるメッセージ重複消費の問題は,消費者によってはメッセージ処理が行われているが,来なかった場合やoffsetを提出し,消費者が掛けた場合,再起動して重複消費を招く可能性がある.このニュースが消費されたかどうかをどう判断しますか?
解決策:
グローバルMessageIDを使用して消費者が同じものを使用していると判断し,べき乗等性を解決する.
一.プロバイダのsend 2メソッドの変更
 public void send2(User user) {
        JSONObject jsonObject=new JSONObject();
        String userjson = jsonObject.toJSONString(user);
        UUID uuid = UUID.randomUUID();
        //       id         id  
        Message message = MessageBuilder.withBody(userjson.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(uuid + "").build(); //  id          UUID   ID
        rabbitTemplate1.convertAndSend("directExchange","direct", message);
    }

二.消費者側メソッドの修正
@Component

public class HelloConsumer {

    @RabbitListener(queues = "direct")
    @RabbitHandler
    public void process(Message message, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();  //id   
        String ok= redis.get("msg"+messageId);// redis         id
         if (ok=="sucess"){  //               。
         channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, false);
            return;
        }
        String  msg = new String(message.getBody());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String name = jsonObject.getString("name");
        System.out.println(jsonObject);

            if ("  ".equals(name)) {
                System.out.println("    ");
                redis.set("msg"+messageId,"success");//  redis
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }else
            {
                System.out.println("    !");
                throw  new RuntimeException("        !");
            }
        }
    }

実は解決方法:1つのメッセージ消費テーブルで各メッセージを記録し、各メッセージにid(uuid)を設定し、消費したらテーブルに保存します.情報が来たときは、先に消費したかどうかを調べます.メッセージのべき乗等性.それほど悪くないです.使い方を見てください.
実はredisを使って私のspringboot統合redis編のリンクを見ることができます:https://blog.csdn.net/qq_41085151/article/details/106904937
文章の中ですべて私の个人の理解で、亲测したことがあって、各位の小さい仲间を歓迎して问题を出して、良いと思っていいですか?