RabbitMQ消費側の制限フロー戦略

7400 ワード

1つのシーンを仮定すると、私たちの消費者側が突然すべて使用できなくなったため、rabbitMQサーバには何万もの未処理のメッセージがあり、このとき何もしていなければ、勝手に1つの消費者側クライアントを開くと、大量のメッセージが瞬時にすべてプッシュされますが、私たちの1つのクライアントはこのような多くのデータを同時に処理することはできません.消費者側が巨大なカードになり、直接崩壊して使えなくなる可能性があります.従って、実際の生産では、限流保護が重要である.
rabbitMQは、非自動確認メッセージを前提に、一定数のメッセージ(consumeまたはchannelに基づいてQOSの値を設定する)が確認されないまで、新しいメッセージを消費しないqos(サービス品質保証)機能を提供する.キーコードは消費者コードを宣言するものです
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
  • prefetchSize:0
  • prefetchCount:RabbitMQは、N個以上のメッセージを同時に消費者にプッシュしないでください.すなわち、N個のメッセージがまだackがない場合、consumerはメッセージack
  • があるまでblockを削除します.
  • global:true、falseが上の設定をchannelに適用するかどうか、簡単に言えば、上の制限がchannelレベルなのかconsumerレベル
  • なのか
    備考:prefetchSizeとglobalの2つはrabbitmqでは実現されず,しばらく検討しない.特に、prefetchCountはno_ask=falseの場合に有効になります.すなわち、自動応答の場合、この2つの値は有効になりません.
    コードのデモ:
        :    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api    
    

    生産側コードはほとんど変わっておらず、exchangeとroutingKeyを変更しただけです
    public class Procuder {
    
    	private static final Logger log = LoggerFactory.getLogger(Procuder.class);
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
    		connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
    		connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
    
    		Connection connection = connectionFactory.newConnection();
    		Channel channel = connection.createChannel();
    
    		String msg = "Hello RabbitMQ limit Message";
            for(int i = 0; i < 5; i ++){
                log.info("     :{}", msg + i);
                channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
            }
    	}
    }
    

    消費者側コードはautoAckをfalse**に設定**channelを追加する必要があります.basicQos(0, 1, false);
    完全な消費者側コードは以下の通りです.
    /**
     *         
     */
    public class Consumer {
    
    	private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    	
    	public static final String EXCHANGE_NAME = "test_qos_exchange";
    	public static final String EXCHANGE_TYPE = "topic";
    	public static final String ROUTING_KEY_TYPE = "qos.#";
    	public static final String ROUTING_KEY = "cqos.save";
    	public static final String QUEUE_NAME = "test_qos_queue";
    	
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1   ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
            connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
            connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
            //2   C	onnection
            Connection connection = connectionFactory.newConnection();
            //3   Connection      Channel
            Channel channel = connection.createChannel();
            
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
            
            /**
             * prefetchSize:0
             prefetchCount:   RabbitMQ              N   ,  N 
                     N        ack,   consumer   block  ,      ack   ,     N    
             global:true\false           channel  ,false consumer  
             prefetchSize  global   ,rabbitmq    ,     
             */
            channel.basicQos(0, 1, false);
    
            //        
            //1              autoAck    false
          //        
            channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
            log.info("       ");
    	}
    }
    
    

    カスタム消費者
    public class MyConsumer extends DefaultConsumer {
    
    	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    	
    	 private Channel channel;
    	 
    	public MyConsumer(Channel channel) {
    		super(channel);
    		this.channel = channel;
    	}
    
    	@Override
        public void handleDelivery(String consumerTag,  //     
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            
            log.info("------limit-----consume message----------");
            log.info("consumerTag: " + consumerTag);
            log.info("envelope: " + envelope);
            log.info("properties: " + properties);
            log.info("body: " + new String(body));
            //     ACK  
           //channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    
    

    その後、消費者側を起動し、管理台に行ってtestを表示します.qos_Exchangeとtest_qos_Queueが確認test_を生成したかどうかqos_Exchangeにtest_がバインドされていますqos_queue
    本番を開始して5つのメッセージを送信
    消費者側が1つのメッセージしか印刷していないことに気づいた
    管理台からも計5件のメッセージが表示され、4件が待機しており、1件は消費したがackが戻ってカスタム消費者のコードを修正していない.以下に示す.
    public class MyConsumer extends DefaultConsumer {
    
    	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    	
    	 private Channel channel;
    	 
    	public MyConsumer(Channel channel) {
    		super(channel);
    		this.channel = channel;
    	}
    
    	@Override
        public void handleDelivery(String consumerTag,  //     
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            
            log.info("------limit-----consume message----------");
            log.info("consumerTag: " + consumerTag);
            log.info("envelope: " + envelope);
            log.info("properties: " + properties);
            log.info("body: " + new String(body));
            //     ACK  
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    

    消費者側を再起動し、消費者側を見ると1本1本消費し、ACKは戻った.
    以上のように簡単なRabbitMQ消費側の限流戦略である