RabbitMQ消費側の制限フロー戦略
7400 ワード
1つのシーンを仮定すると、私たちの消費者側が突然すべて使用できなくなったため、rabbitMQサーバには何万もの未処理のメッセージがあり、このとき何もしていなければ、勝手に1つの消費者側クライアントを開くと、大量のメッセージが瞬時にすべてプッシュされますが、私たちの1つのクライアントはこのような多くのデータを同時に処理することはできません.消費者側が巨大なカードになり、直接崩壊して使えなくなる可能性があります.従って、実際の生産では、限流保護が重要である.
rabbitMQは、非自動確認メッセージを前提に、一定数のメッセージ(consumeまたはchannelに基づいてQOSの値を設定する)が確認されないまで、新しいメッセージを消費しないqos(サービス品質保証)機能を提供する.キーコードは消費者コードを宣言するものです prefetchSize:0 prefetchCount:RabbitMQは、N個以上のメッセージを同時に消費者にプッシュしないでください.すなわち、N個のメッセージがまだackがない場合、consumerはメッセージack があるまでblockを削除します. global:true、falseが上の設定をchannelに適用するかどうか、簡単に言えば、上の制限がchannelレベルなのかconsumerレベル なのか
備考:prefetchSizeとglobalの2つはrabbitmqでは実現されず,しばらく検討しない.特に、prefetchCountはno_ask=falseの場合に有効になります.すなわち、自動応答の場合、この2つの値は有効になりません.
コードのデモ:
生産側コードはほとんど変わっておらず、exchangeとroutingKeyを変更しただけです
消費者側コードはautoAckをfalse**に設定**channelを追加する必要があります.basicQos(0, 1, false);
完全な消費者側コードは以下の通りです.
カスタム消費者
その後、消費者側を起動し、管理台に行ってtestを表示します.qos_Exchangeとtest_qos_Queueが確認test_を生成したかどうかqos_Exchangeにtest_がバインドされていますqos_queue
本番を開始して5つのメッセージを送信
消費者側が1つのメッセージしか印刷していないことに気づいた
管理台からも計5件のメッセージが表示され、4件が待機しており、1件は消費したがackが戻ってカスタム消費者のコードを修正していない.以下に示す.
消費者側を再起動し、消費者側を見ると1本1本消費し、ACKは戻った.
以上のように簡単なRabbitMQ消費側の限流戦略である
rabbitMQは、非自動確認メッセージを前提に、一定数のメッセージ(consumeまたはchannelに基づいてQOSの値を設定する)が確認されないまで、新しいメッセージを消費しないqos(サービス品質保証)機能を提供する.キーコードは消費者コードを宣言するものです
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
備考:prefetchSizeとglobalの2つはrabbitmqでは実現されず,しばらく検討しない.特に、prefetchCountはno_ask=falseの場合に有効になります.すなわち、自動応答の場合、この2つの値は有効になりません.
コードのデモ:
: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api
生産側コードはほとんど変わっておらず、exchangeとroutingKeyを変更しただけです
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ limit Message";
for(int i = 0; i < 5; i ++){
log.info(" :{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
消費者側コードはautoAckをfalse**に設定**channelを追加する必要があります.basicQos(0, 1, false);
完全な消費者側コードは以下の通りです.
/**
*
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "qos.#";
public static final String ROUTING_KEY = "cqos.save";
public static final String QUEUE_NAME = "test_qos_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 C onnection
Connection connection = connectionFactory.newConnection();
//3 Connection Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
/**
* prefetchSize:0
prefetchCount: RabbitMQ N , N
N ack, consumer block , ack , N
global:true\false channel ,false consumer
prefetchSize global ,rabbitmq ,
*/
channel.basicQos(0, 1, false);
//
//1 autoAck false
//
channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
log.info(" ");
}
}
カスタム消費者
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
// ACK
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
その後、消費者側を起動し、管理台に行ってtestを表示します.qos_Exchangeとtest_qos_Queueが確認test_を生成したかどうかqos_Exchangeにtest_がバインドされていますqos_queue
本番を開始して5つのメッセージを送信
消費者側が1つのメッセージしか印刷していないことに気づいた
管理台からも計5件のメッセージが表示され、4件が待機しており、1件は消費したがackが戻ってカスタム消費者のコードを修正していない.以下に示す.
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
// ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消費者側を再起動し、消費者側を見ると1本1本消費し、ACKは戻った.
以上のように簡単なRabbitMQ消費側の限流戦略である