RocketMQシーケンスメッセージ
4825 ワード
シーケンスメッセージ
以前、ローカルで使用していたclientバージョンは3.6.2でしたが、会社のサーバには3.2.6のバージョンがインストールされています.テスト順序メッセージが失敗し続けた.その後、クライアントバージョンを3.2.6に下げてテストに成功しました.だから、使用するときは、バージョンのマッチングに注意してください.そうしないと、奇妙なエラーが発生する可能性があります.
producer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = 0;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicOrder","TagA", "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
解析:メッセージの順序性を保証するには、メッセージを送信するときに、このメッセージのセットを同じqueueに送信する必要があります.(brokerのデフォルト4つのqueue).上記のコードでは、orderIdは受注番号を表します.send法でセレクタを実現した.このセレクタの役割は、orderIdがqueueの数をモデリングし、同じorderIdのすべてのメッセージが同じqueueに落ちることを保証することです.
consumer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.yunsheng.Factory;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrder", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random(10);
public ConsumeOrderlyStatus consumeMessage(List msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");
}
try {
Thread.sleep(random.nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
解析:上は生産側のメッセージの順序性を保証して、それでは消費側はメッセージが順序の消費を保証しなければならない.MessageListenerOrderlyを使用します.役割は、前のメッセージが消費されるのを待たなければ、後のメッセージが消費されないことです.コードにsleep検証を追加しました.
結果:
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9
単一スレッド処理ではないが,順序消費が保証されていることがわかる.