Rabbitmqの遅延キューを実現するには、このステップだけが必要です.

19100 ワード

ちえんキュー
遅延キューに格納されるオブジェクトは遅延メッセージであり、「遅延メッセージ」とは、メッセージが送信された後、消費者に直ちにメッセージを取得して消費させたくないのではなく、指定された時間を待ってから消費者がこのメッセージを取得して消費することを意味する.
シーンの適用
例えば、未払い注文が一定時間を超えると、システムは自動的に注文をキャンセルし、占有物を解放します.
インプリメンテーションモード
RabbitMQを用いて遅延キューを実現するには以下の2つの方法があり,本稿では主に1つ目,2つ目を次の記事で紹介する.
1、TTL + DLX
2、遅延カードの使用
TTL + DLX
Time To Live(TTL)
RabbitMQは、Queueに対してx-expiresを設定するか、Messageに対してx-message-ttlを設定してメッセージの生存時間を制御し、タイムアウト(両方とも最初に期限切れになる時間を基準に設定)すると、メッセージはdead letter(デッドレター)になる
RabbitMQは、キュー内のメッセージの有効期限について2つの方法で設定できます.
A:キューのプロパティ設定により、キュー内のすべてのメッセージに同じ期限が切れます.B:メッセージを個別に設定します.各メッセージTTLは異なります.
同時に使用される場合、メッセージの有効期限は、両者の間のTTLが小さい値に準じる.メッセージのキュー生存時間が設定したTTL値を超えるとdead letterとなる
DLX(Dead-Letter-Exchange)
DLX(Dead-letter-EXchange)スイッチは、メッセージが1つのキューでデッドメッセージ(Dead message)になった後、DLXに再送信されることができ、DLXをバインドするキューをデッドメッセージキューと呼ぶ.デッドメッセージキューは、拒否されたメッセージまたは送信されていないメッセージを検出することによって、問題を追跡することができる.
RabbitMQのQueueは、x-dead-letter-exchangeとx-dead-letter-routing-key(オプション)の2つのパラメータを構成し、キュー内にdead letterが表示された場合、この2つのパラメータに従って指定されたキューに再ルーティングして転送します.
x-dead-letter-exchange:  dead letter   dead letter       exchange

x-dead-letter-routing-key:  dead letter   dead letter       routing-key  

キューにdead letterが表示される場合:
1、メッセージまたはキューのTTLが期限切れになる
2、キューが最大長に達する
3、メッセージは消費者側に拒否され(basic.reject or basic.nack)、requeue=false
TTLルールが設定された後、メッセージが1つのキューでデッドメッセージになった場合、DLXの特性のため、別のExchangeまたはRouting Keyに再転送され、メッセージが再消費されます.
≪インスタンス|Instance|emdw≫
次にTTL+DLXでRabbitmqの遅延キューを実現します
DLXの構成クラス
@Configuration
public class RabbitMqConfig {

    public static final String QUEUE_ORDER_CANCEL="queue_cancel";

    public static final String EXCHANGE_ORDER_CANCEL="exchange_cancel";

    public static final String QUEUE_TTL_ORDER_CANCEL="ttl_queue_cancel";

    public static final String EXCHANGE_TTL_ORDER_CANCEL="ttl_exchange_cancel";


    /**
     *               
     */
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(EXCHANGE_ORDER_CANCEL)
                .durable(true)
                .build();
    }

    /**
     *                
     */
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(EXCHANGE_TTL_ORDER_CANCEL)
                .durable(true)
                .build();
    }

    /**
     *           
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(QUEUE_ORDER_CANCEL);
    }

    /**
     *       (    )
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QUEUE_TTL_ORDER_CANCEL)
                .withArgument("x-dead-letter-exchange", EXCHANGE_ORDER_CANCEL)//         
                .withArgument("x-dead-letter-routing-key", QUEUE_ORDER_CANCEL)//         
                .build();
    }

    /**
     *                  
     */
    @Bean
    Binding orderBinding(){
        return BindingBuilder
                .bind(orderQueue())
                .to(orderDirect())
                .with(QUEUE_ORDER_CANCEL);
    }

    /**
     *                
     */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue())
                .to(orderTtlDirect())
                .with(QUEUE_TTL_ORDER_CANCEL);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){

        RabbitTemplate rabbitTemplate =new RabbitTemplate(connectionFactory);

        return rabbitTemplate;

    }


}


MessagePostProcessorによるメッセージの有効期限の設定
import org.springframework.amqp.core.Message;

@Component
public class RabbitmqSender {

    private static Logger LOGGER = LoggerFactory.getLogger(RabbitmqSender.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //         
        amqpTemplate.convertAndSend(EXCHANGE_TTL_ORDER_CANCEL, QUEUE_TTL_ORDER_CANCEL, orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //          
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}

Messageオブジェクトの場合、次の方法で有効期限を設定します.
Message message=new Message("hello".getBytes(),new MessageProperties());
message.getMessageProperties().setExpiration(String.valueOf(1000));

以上は単一メッセージに対して期限切れを設定したものであり、RabbitMQは各キューにメッセージのタイムアウト時間を設定することをサポートし、メッセージがキューに入ってから計算を開始し、キューのタイムアウト時間構成を超えた場合、メッセージは自動的に期限切れになる.
キュー・メッセージの有効期限を設定するプロパティは、x-message-ttlです.
注意:キューメッセージの有効期限が設定されている場合、キュー内のすべてのmessageの有効期限は同じで、メッセージの有効期限が同じ場合に適用されます.メッセージの有効期限を異にする場合は、メッセージの有効期限を個別に設定します(キュー・メッセージの有効期限と個別メッセージの有効期限が両方とも存在する場合は、有効期限が小さい方が先に実行されます).
    /**
     *       (    )
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QUEUE_TTL_ORDER_CANCEL)
                .withArgument("x-dead-letter-exchange", EXCHANGE_ORDER_CANCEL)//         
                .withArgument("x-dead-letter-routing-key", QUEUE_ORDER_CANCEL)//         
                .withArgument("x-message-ttl", 10000)//          ,   
                .build();
    }

消費情報は普通の情報と同じで、他の変更は必要ありません.