RocketMQシーケンスメッセージ

4825 ワード

シーケンスメッセージ


以前、ローカルで使用していたclientバージョンは3.6.2でしたが、会社のサーバには3.2.6のバージョンがインストールされています.テスト順序メッセージが失敗し続けた.その後、クライアントバージョンを3.2.6に下げてテストに成功しました.だから、使用するときは、バージョンのマッチングに注意してください.そうしないと、奇妙なエラーが発生する可能性があります.

producer

package com.yunsheng.orderExample;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;

import java.util.List;


public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 10; i++) {
            int orderId = 0;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicOrder","TagA", "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

解析:メッセージの順序性を保証するには、メッセージを送信するときに、このメッセージのセットを同じqueueに送信する必要があります.(brokerのデフォルト4つのqueue).上記のコードでは、orderIdは受注番号を表します.send法でセレクタを実現した.このセレクタの役割は、orderIdがqueueの数をモデリングし、同じorderIdのすべてのメッセージが同じqueueに落ちることを保証することです.

consumer

package com.yunsheng.orderExample;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.yunsheng.Factory;

import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrder", "TagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random(10);
            public ConsumeOrderlyStatus consumeMessage(List msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(true);

                for (MessageExt msg : msgs) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");

                }

                try {
                    Thread.sleep(random.nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

解析:上は生産側のメッセージの順序性を保証して、それでは消費側はメッセージが順序の消費を保証しなければならない.MessageListenerOrderlyを使用します.役割は、前のメッセージが消費されるのを待たなければ、後のメッセージが消費されないことです.コードにsleep検証を追加しました.
結果:
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9

単一スレッド処理ではないが,順序消費が保証されていることがわかる.