DefaultMQPShConsumerImplの同時消費と順次消費

14647 ワード

DefaultMQpushConsumerImplはメッセージを引き出し、processQueueのTreeMapに入れる
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

consumeMessageServiceは同時消費と順次消費に分けられます
シーケンス消費とは、同じ時刻に1つのqueueが1つのスレッドしか消費されないことを意味します.1つのスレッドのみを消費し,ロックによって実現し,順序はTreeMapによって実現する.
一つの事実は、D e f a u l t M QpushConsumer#c o n s u m e MessageBatchMaxSize=1、つまりデフォルトのロット消費個数は1、どういう意味ですか?
concurrentlyが消費する、1つのqueueが32個のメッセージに引き寄せられると、32個のConsumerRequestオブジェクトが作成され、1個のConsumerRequestは1個のメッセージのみがスレッドプールにコミットされ、ConsumerRequestが実行する.run.
一方、orderlyが消費する、一つのqueueが32個のメッセージに引き寄せられると、ConsumerRequestオブジェクトが作成され、スレッドプールに提出する、ConsumerRequest.runメソッドでは、TreeMapが空になるまでtake offsetが最小のメッセージを保持します.
concurrently ConsumeRequestの作成
public void submitConsumeRequest(
    final List msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List msgThis = new ArrayList(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

orderly ConsumerRequestの作成
public void submitConsumeRequest(
    final List msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

concurrently ConsumeRequest#run消費主体ロジック
//
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

orderly ConsumerRequest#run消費主体ロジック
//  
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    for (boolean continueConsume = true; continueConsume; ) {
        //   TreeMap  
        List msgs = this.processQueue.takeMessags(consumeBatchSize);
        if (!msgs.isEmpty()) {
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } else {
            continueConsume = false;
        }
    }
    ...
}

public class MessageQueueLock {
    private ConcurrentMap mqLockTable = new ConcurrentHashMap();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

 
offsetコミットoffsetについては、消費者がbrokerから引き出した次のメッセージのオフセット量である
takeメッセージを順次消費する場合は、メッセージをmsgTreeMapから取り出し、consumingMsgOrderlyTreeMapに入れて消費完了後、consumingMsgOrderlyTreeMapでのメッセージが完全に消費することを示す、consumingMsgOrderlyTreeMapをクリアし、offset=thisを設定する.consumingMsgOrderlyTreeMap.lastKey() + 1
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

同時消費
メッセージをmsgTreeMapから直接削除し、msgTreeMapの最初のメッセージのqueue offset値を返します.
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
public long removeMessage(final List msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);

                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }

    return result;
}

消費失敗
順次消費し、メッセージの処理に失敗し、再試行回数がしきい値より小さい場合はconsumingMsgOrderlyTreeMapからメッセージを取り出し、msgTreeMapに再読み込みし、再試行回数がしきい値を超えるとbrokerにメッセージを送信し、brokerは再試行回数に応じてSCHDULEにメッセージを送信する.TOPIC_XXXXまたはデッドラインキュー
同時消費は、メッセージの処理に失敗し、brokerに送信され、送信に失敗した場合、消費を継続します.
ConsumerRequestをコミットする2つのタイミング.1つはメッセージをプルすることであり、2つは処理中に異常が発生した後にコミットを遅延することである.