RocketMQソース-十、RocketMQシーケンスメッセージ


RocketMQ自体はシーケンスメッセージをサポートしており、使用上のシーケンスメッセージと非シーケンスメッセージの違いがあります
送信順序メッセージ
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

sendメソッドにはパラメータMessageQueueSelectorがあり,MessageQueueSelectorはユーザ自身にメッセージがどのキューに送信されるかを決定させ,ローカルメッセージであればメッセージとキューの対応関係を決定するために用いられる.
シーケンスメッセージ消費
consumer.registerMessageListener(new MessageListenerOrderly() {
    AtomicLong consumeTimes = new AtomicLong(0);

    @Override
    public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(false);
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

使用上,シーケンスメッセージは送信から消費までの全過程で秩序を保証する必要があると推定できるため,シーケンスメッセージは具体的には
  • 送信メッセージは、シーケンスの
  • である.
  • brokerストレージメッセージは、順序の
  • である.
  • consumer消費は順番の
  • 次にrmqがシーケンスメッセージをどのように実現するかをそれぞれ見てみましょう.
    送信順序メッセージ
    brokerはメッセージ秩序を格納する前提はproducerがメッセージを送信することが秩序であるため、この2つを結合して言う.
    メッセージのパブリケーションは秩序ある意味です.producerがメッセージを送信するのは順番に送信されるべきなので、メッセージを送信するように要求するときは保証します.
  • メッセージは非同期で送信できません.同期送信時にbroker受信が秩序化されていることを保証できます.
  • 送信ごとに選択するのは同じMessageQueue
  • である.
    同期送信
    最初の例からメッセージを送信すると、次の方法が呼び出されます.
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
    }

    CommunicationMode.SYNCはproducerがメッセージを送信する際に同期して送信されることを示している.同期送信は、producerがメッセージを送信した後、すぐに戻ることはなく、brokerのresponseを待つことを示します.
    brokerはproducerの要求を受け取った後、スレッド処理を開始するが、スレッドがcommitLogにメッセージを書き込むとresponseをproducerに送信し、producerはbrokerのresponseを受信し、処理に成功した後にメッセージ送信に成功する.
    同じMessageQueue
    brokerがメッセージを受信するのも順序であることを保証するために、producerはいずれかのキューにメッセージを送信するしかありません.同じキューのみがメッセージが同じbrokerに送信されることを保証し、同じbrokerが送信されたメッセージを処理してこそ順序を保証することができるからです.したがって,シーケンスメッセージを送信する際にはユーザがMessageQueueを指定する必要があり,send呼び出し中に次のメソッドが呼び出され,次のメソッドではユーザが指定したselect queueのメソッドがコールバックされる.
    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
    
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
                //        select       queue,       ,           queue    
                mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }
    
            if (mq != null) {
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }
    
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }

    このように毎回送信されるメッセージは同じMessageQueue,つまり同じbrokerに送信され,この送信メッセージのプロセスは順序を保証し,brokerがCommitLogに格納されているメッセージも順序を保証する.
    順次消費
    brokerに物理的に格納されているメッセージが順序であることを保証し、メッセージ消費が順序であることを保証すれば、プロセス全体が順序メッセージであることを保証することができる.
    やはり最初の例では,順序消費と一般消費のlistenerは異なり,順序消費が実現しなければならないのは次のようなインタフェースである.org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
    Consumerが起動するとlistenerのタイプによってどのサービスを使って消費すべきかを判断します
    // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
        //     
        this.consumeOrderly = true;
        this.consumeMessageService =
            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
        //      
        this.consumeOrderly = false;
        this.consumeMessageService =
            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
    }

    consumerがメッセージを引き出すのはoffsetに従って引くので、consumerはconsumerに引くメッセージが連続的に秩序があることを保証することができますが、consumerがメッセージを引き出した後にスレッドを起動してメッセージを処理するので、先に引いたメッセージが先に処理されることを保証することはできません.
    // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
    // org.apache.rocketmq.client.consumer.PullCallback#onSuccess
    //          ProcessQueue
    boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    //    consumeMessageService  ConsumeMessageOrderlyService
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispathToConsume);
    
    
    // org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
    public void submitConsumeRequest(
        final List msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            //         ConsumeRequest,           
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

    リクエストはスレッドプールに格納されているので、スレッドの実行順序が不確定になり、consumer消費は無秩序になりますか?
    答えは明らかに無秩序にはなりません.上に重要なコードがあるからです.processQueue.putMessage(pullResult.getMsgFoundList());
    まずProcessQueueという重要なデータ構造についてお話しします.MessageQueueはProcessQueueに対応し、queueIdの下でbrokerpullから戻ってきたすべてのメッセージを記録する秩序化されたキューであり、消費に成功するとキューから削除されます.ProcessQueue秩序化の理由はTreeMapを維持しているからですprivate final TreeMap msgTreeMap = new TreeMap();
    msgTreeMap:broker pullから戻ってきたすべてのメッセージがメンテナンスされています.TreeMapは秩序正しく、keyはLongタイプで、comparatorは指定されていません.デフォルトはkeyをComparableに強く変換し、比較します.keyは現在のメッセージのoffsetであり、LongはComparableインタフェースを実現しているので、msgTreeMapのメッセージはoffsetでソートされています.
    したがって、ProcessQueueは、引き戻すメッセージが秩序化されていることを保証し、上述の起動スレッドに従ってConsumeRequestを実行する.runメソッドはメッセージを消費する
    Override
    public void run() {
        //               ,            ,             
        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    //  ProcessQueue         ,            
                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    
                    List msgs = this.processQueue.takeMessags(consumeBatchSize);
                    //       
    }

    ProcessQueueから取得したメッセージは秩序化されており,consumerは消費の秩序性を保証している.
    まとめ
    rmqは、メッセージの送信、brokerストレージメッセージからconsumer消費メッセージへの全過程において、ループが相殺され、最終的にメッセージが秩序化されていることを保証する.