DefaultMQPShConsumerImplの同時消費と順次消費
14647 ワード
DefaultMQpushConsumerImplはメッセージを引き出し、processQueueのTreeMapに入れる
consumeMessageServiceは同時消費と順次消費に分けられます
シーケンス消費とは、同じ時刻に1つのqueueが1つのスレッドしか消費されないことを意味します.1つのスレッドのみを消費し,ロックによって実現し,順序はTreeMapによって実現する.
一つの事実は、D e f a u l t M QpushConsumer#c o n s u m e MessageBatchMaxSize=1、つまりデフォルトのロット消費個数は1、どういう意味ですか?
concurrentlyが消費する、1つのqueueが32個のメッセージに引き寄せられると、32個のConsumerRequestオブジェクトが作成され、1個のConsumerRequestは1個のメッセージのみがスレッドプールにコミットされ、ConsumerRequestが実行する.run.
一方、orderlyが消費する、一つのqueueが32個のメッセージに引き寄せられると、ConsumerRequestオブジェクトが作成され、スレッドプールに提出する、ConsumerRequest.runメソッドでは、TreeMapが空になるまでtake offsetが最小のメッセージを保持します.
concurrently ConsumeRequestの作成
orderly ConsumerRequestの作成
concurrently ConsumeRequest#run消費主体ロジック
orderly ConsumerRequest#run消費主体ロジック
offsetコミットoffsetについては、消費者がbrokerから引き出した次のメッセージのオフセット量である
takeメッセージを順次消費する場合は、メッセージをmsgTreeMapから取り出し、consumingMsgOrderlyTreeMapに入れて消費完了後、consumingMsgOrderlyTreeMapでのメッセージが完全に消費することを示す、consumingMsgOrderlyTreeMapをクリアし、offset=thisを設定する.consumingMsgOrderlyTreeMap.lastKey() + 1
同時消費
メッセージをmsgTreeMapから直接削除し、msgTreeMapの最初のメッセージのqueue offset値を返します.
消費失敗
順次消費し、メッセージの処理に失敗し、再試行回数がしきい値より小さい場合はconsumingMsgOrderlyTreeMapからメッセージを取り出し、msgTreeMapに再読み込みし、再試行回数がしきい値を超えるとbrokerにメッセージを送信し、brokerは再試行回数に応じてSCHDULEにメッセージを送信する.TOPIC_XXXXまたはデッドラインキュー
同時消費は、メッセージの処理に失敗し、brokerに送信され、送信に失敗した場合、消費を継続します.
ConsumerRequestをコミットする2つのタイミング.1つはメッセージをプルすることであり、2つは処理中に異常が発生した後にコミットを遅延することである.
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
consumeMessageServiceは同時消費と順次消費に分けられます
シーケンス消費とは、同じ時刻に1つのqueueが1つのスレッドしか消費されないことを意味します.1つのスレッドのみを消費し,ロックによって実現し,順序はTreeMapによって実現する.
一つの事実は、D e f a u l t M QpushConsumer#c o n s u m e MessageBatchMaxSize=1、つまりデフォルトのロット消費個数は1、どういう意味ですか?
concurrentlyが消費する、1つのqueueが32個のメッセージに引き寄せられると、32個のConsumerRequestオブジェクトが作成され、1個のConsumerRequestは1個のメッセージのみがスレッドプールにコミットされ、ConsumerRequestが実行する.run.
一方、orderlyが消費する、一つのqueueが32個のメッセージに引き寄せられると、ConsumerRequestオブジェクトが作成され、スレッドプールに提出する、ConsumerRequest.runメソッドでは、TreeMapが空になるまでtake offsetが最小のメッセージを保持します.
concurrently ConsumeRequestの作成
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List msgThis = new ArrayList(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
orderly ConsumerRequestの作成
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
concurrently ConsumeRequest#run消費主体ロジック
// ,
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
orderly ConsumerRequest#run消費主体ロジック
//
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
for (boolean continueConsume = true; continueConsume; ) {
// TreeMap
List msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} else {
continueConsume = false;
}
}
...
}
public class MessageQueueLock {
private ConcurrentMap mqLockTable = new ConcurrentHashMap();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
offsetコミットoffsetについては、消費者がbrokerから引き出した次のメッセージのオフセット量である
takeメッセージを順次消費する場合は、メッセージをmsgTreeMapから取り出し、consumingMsgOrderlyTreeMapに入れて消費完了後、consumingMsgOrderlyTreeMapでのメッセージが完全に消費することを示す、consumingMsgOrderlyTreeMapをクリアし、offset=thisを設定する.consumingMsgOrderlyTreeMap.lastKey() + 1
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
同時消費
メッセージをmsgTreeMapから直接削除し、msgTreeMapの最初のメッセージのqueue offset値を返します.
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
public long removeMessage(final List msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
消費失敗
順次消費し、メッセージの処理に失敗し、再試行回数がしきい値より小さい場合はconsumingMsgOrderlyTreeMapからメッセージを取り出し、msgTreeMapに再読み込みし、再試行回数がしきい値を超えるとbrokerにメッセージを送信し、brokerは再試行回数に応じてSCHDULEにメッセージを送信する.TOPIC_XXXXまたはデッドラインキュー
同時消費は、メッセージの処理に失敗し、brokerに送信され、送信に失敗した場合、消費を継続します.
ConsumerRequestをコミットする2つのタイミング.1つはメッセージをプルすることであり、2つは処理中に異常が発生した後にコミットを遅延することである.