Rabbitmqの遅延キューを実現するには、このステップだけが必要です.
19100 ワード
ちえんキュー
遅延キューに格納されるオブジェクトは遅延メッセージであり、「遅延メッセージ」とは、メッセージが送信された後、消費者に直ちにメッセージを取得して消費させたくないのではなく、指定された時間を待ってから消費者がこのメッセージを取得して消費することを意味する.
シーンの適用
例えば、未払い注文が一定時間を超えると、システムは自動的に注文をキャンセルし、占有物を解放します.
インプリメンテーションモード
RabbitMQを用いて遅延キューを実現するには以下の2つの方法があり,本稿では主に1つ目,2つ目を次の記事で紹介する.
1、TTL + DLX
2、遅延カードの使用
TTL + DLX
Time To Live(TTL)
RabbitMQは、Queueに対してx-expiresを設定するか、Messageに対してx-message-ttlを設定してメッセージの生存時間を制御し、タイムアウト(両方とも最初に期限切れになる時間を基準に設定)すると、メッセージはdead letter(デッドレター)になる
RabbitMQは、キュー内のメッセージの有効期限について2つの方法で設定できます.
A:キューのプロパティ設定により、キュー内のすべてのメッセージに同じ期限が切れます.B:メッセージを個別に設定します.各メッセージTTLは異なります.
同時に使用される場合、メッセージの有効期限は、両者の間のTTLが小さい値に準じる.メッセージのキュー生存時間が設定したTTL値を超えるとdead letterとなる
DLX(Dead-Letter-Exchange)
DLX(Dead-letter-EXchange)スイッチは、メッセージが1つのキューでデッドメッセージ(Dead message)になった後、DLXに再送信されることができ、DLXをバインドするキューをデッドメッセージキューと呼ぶ.デッドメッセージキューは、拒否されたメッセージまたは送信されていないメッセージを検出することによって、問題を追跡することができる.
RabbitMQのQueueは、x-dead-letter-exchangeとx-dead-letter-routing-key(オプション)の2つのパラメータを構成し、キュー内にdead letterが表示された場合、この2つのパラメータに従って指定されたキューに再ルーティングして転送します.
キューにdead letterが表示される場合:
1、メッセージまたはキューのTTLが期限切れになる
2、キューが最大長に達する
3、メッセージは消費者側に拒否され(basic.reject or basic.nack)、requeue=false
TTLルールが設定された後、メッセージが1つのキューでデッドメッセージになった場合、DLXの特性のため、別のExchangeまたはRouting Keyに再転送され、メッセージが再消費されます.
≪インスタンス|Instance|emdw≫
次にTTL+DLXでRabbitmqの遅延キューを実現します
DLXの構成クラス
MessagePostProcessorによるメッセージの有効期限の設定
Messageオブジェクトの場合、次の方法で有効期限を設定します.
以上は単一メッセージに対して期限切れを設定したものであり、RabbitMQは各キューにメッセージのタイムアウト時間を設定することをサポートし、メッセージがキューに入ってから計算を開始し、キューのタイムアウト時間構成を超えた場合、メッセージは自動的に期限切れになる.
キュー・メッセージの有効期限を設定するプロパティは、x-message-ttlです.
注意:キューメッセージの有効期限が設定されている場合、キュー内のすべてのmessageの有効期限は同じで、メッセージの有効期限が同じ場合に適用されます.メッセージの有効期限を異にする場合は、メッセージの有効期限を個別に設定します(キュー・メッセージの有効期限と個別メッセージの有効期限が両方とも存在する場合は、有効期限が小さい方が先に実行されます).
消費情報は普通の情報と同じで、他の変更は必要ありません.
遅延キューに格納されるオブジェクトは遅延メッセージであり、「遅延メッセージ」とは、メッセージが送信された後、消費者に直ちにメッセージを取得して消費させたくないのではなく、指定された時間を待ってから消費者がこのメッセージを取得して消費することを意味する.
シーンの適用
例えば、未払い注文が一定時間を超えると、システムは自動的に注文をキャンセルし、占有物を解放します.
インプリメンテーションモード
RabbitMQを用いて遅延キューを実現するには以下の2つの方法があり,本稿では主に1つ目,2つ目を次の記事で紹介する.
1、TTL + DLX
2、遅延カードの使用
TTL + DLX
Time To Live(TTL)
RabbitMQは、Queueに対してx-expiresを設定するか、Messageに対してx-message-ttlを設定してメッセージの生存時間を制御し、タイムアウト(両方とも最初に期限切れになる時間を基準に設定)すると、メッセージはdead letter(デッドレター)になる
RabbitMQは、キュー内のメッセージの有効期限について2つの方法で設定できます.
A:キューのプロパティ設定により、キュー内のすべてのメッセージに同じ期限が切れます.B:メッセージを個別に設定します.各メッセージTTLは異なります.
同時に使用される場合、メッセージの有効期限は、両者の間のTTLが小さい値に準じる.メッセージのキュー生存時間が設定したTTL値を超えるとdead letterとなる
DLX(Dead-Letter-Exchange)
DLX(Dead-letter-EXchange)スイッチは、メッセージが1つのキューでデッドメッセージ(Dead message)になった後、DLXに再送信されることができ、DLXをバインドするキューをデッドメッセージキューと呼ぶ.デッドメッセージキューは、拒否されたメッセージまたは送信されていないメッセージを検出することによって、問題を追跡することができる.
RabbitMQのQueueは、x-dead-letter-exchangeとx-dead-letter-routing-key(オプション)の2つのパラメータを構成し、キュー内にdead letterが表示された場合、この2つのパラメータに従って指定されたキューに再ルーティングして転送します.
x-dead-letter-exchange: dead letter dead letter exchange
x-dead-letter-routing-key: dead letter dead letter routing-key
キューにdead letterが表示される場合:
1、メッセージまたはキューのTTLが期限切れになる
2、キューが最大長に達する
3、メッセージは消費者側に拒否され(basic.reject or basic.nack)、requeue=false
TTLルールが設定された後、メッセージが1つのキューでデッドメッセージになった場合、DLXの特性のため、別のExchangeまたはRouting Keyに再転送され、メッセージが再消費されます.
≪インスタンス|Instance|emdw≫
次にTTL+DLXでRabbitmqの遅延キューを実現します
DLXの構成クラス
@Configuration
public class RabbitMqConfig {
public static final String QUEUE_ORDER_CANCEL="queue_cancel";
public static final String EXCHANGE_ORDER_CANCEL="exchange_cancel";
public static final String QUEUE_TTL_ORDER_CANCEL="ttl_queue_cancel";
public static final String EXCHANGE_TTL_ORDER_CANCEL="ttl_exchange_cancel";
/**
*
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(EXCHANGE_ORDER_CANCEL)
.durable(true)
.build();
}
/**
*
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(EXCHANGE_TTL_ORDER_CANCEL)
.durable(true)
.build();
}
/**
*
*/
@Bean
public Queue orderQueue() {
return new Queue(QUEUE_ORDER_CANCEL);
}
/**
* ( )
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QUEUE_TTL_ORDER_CANCEL)
.withArgument("x-dead-letter-exchange", EXCHANGE_ORDER_CANCEL)//
.withArgument("x-dead-letter-routing-key", QUEUE_ORDER_CANCEL)//
.build();
}
/**
*
*/
@Bean
Binding orderBinding(){
return BindingBuilder
.bind(orderQueue())
.to(orderDirect())
.with(QUEUE_ORDER_CANCEL);
}
/**
*
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue())
.to(orderTtlDirect())
.with(QUEUE_TTL_ORDER_CANCEL);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate =new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
MessagePostProcessorによるメッセージの有効期限の設定
import org.springframework.amqp.core.Message;
@Component
public class RabbitmqSender {
private static Logger LOGGER = LoggerFactory.getLogger(RabbitmqSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId,final long delayTimes){
//
amqpTemplate.convertAndSend(EXCHANGE_TTL_ORDER_CANCEL, QUEUE_TTL_ORDER_CANCEL, orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}
Messageオブジェクトの場合、次の方法で有効期限を設定します.
Message message=new Message("hello".getBytes(),new MessageProperties());
message.getMessageProperties().setExpiration(String.valueOf(1000));
以上は単一メッセージに対して期限切れを設定したものであり、RabbitMQは各キューにメッセージのタイムアウト時間を設定することをサポートし、メッセージがキューに入ってから計算を開始し、キューのタイムアウト時間構成を超えた場合、メッセージは自動的に期限切れになる.
キュー・メッセージの有効期限を設定するプロパティは、x-message-ttlです.
注意:キューメッセージの有効期限が設定されている場合、キュー内のすべてのmessageの有効期限は同じで、メッセージの有効期限が同じ場合に適用されます.メッセージの有効期限を異にする場合は、メッセージの有効期限を個別に設定します(キュー・メッセージの有効期限と個別メッセージの有効期限が両方とも存在する場合は、有効期限が小さい方が先に実行されます).
/**
* ( )
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QUEUE_TTL_ORDER_CANCEL)
.withArgument("x-dead-letter-exchange", EXCHANGE_ORDER_CANCEL)//
.withArgument("x-dead-letter-routing-key", QUEUE_ORDER_CANCEL)//
.withArgument("x-message-ttl", 10000)// ,
.build();
}
消費情報は普通の情報と同じで、他の変更は必要ありません.