RocketMQソースを手で引き裂く--ConsumeMessageService


引用する

  • は、メッセージのコア処理クラスを抽出する.
  • は全部で2つの実現があり、同時消費と順次消費がある.
  • はもちろん同時消費の実現が順次消費より簡単であると考えている.では、次に情報を引き出す過程の流れを見てみましょう.

  • 手で鶏を引き裂く


    ConsumeMessageOrderlyService


    前編の起動プロセスでstartメソッドが呼び出されました.何?startがどこで呼び出されたのかまだ分かりません.RocketMQソースコードのDefaultMQpushConsumerを手で引き裂くのを見てください.転送ゲートの具体的なソースコードは以下でMQをタイミングロックしています.クラスタ消費であれば.
    public void start() {
         
    //  
            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
         
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
         
                    @Override
                    public void run() {
         
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
    

    メッセージ要求の処理

    public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispathToConsume) {
         
            if (dispathToConsume) {
         
                ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
                this.consumeExecutor.submit(consumeRequest);
            }
        }
    

    コアのメッセージ処理プロセス

    public void run() {
         
                //  
                if (this.processQueue.isDropped()) {
         
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
                //  
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
         
                    //  
                    //    。
                    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
         
                        final long beginTime = System.currentTimeMillis();
                        for (boolean continueConsume = true; continueConsume; ) {
         
                            if (this.processQueue.isDropped()) {
         
                                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                                && !this.processQueue.isLocked()) {
         
                                log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                                && this.processQueue.isLockExpired()) {
         
                                log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                                break;
                            }
    
                            long interval = System.currentTimeMillis() - beginTime;
                            if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
         
                                ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                                break;
                            }
    
                            final int consumeBatchSize =
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                            //  
                            List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                            if (!msgs.isEmpty()) {
         
                                final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
    
                                ConsumeOrderlyStatus status = null;
    
                                ConsumeMessageContext consumeMessageContext = null;
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                                    consumeMessageContext = new ConsumeMessageContext();
                                    consumeMessageContext
                                        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                    consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                                    consumeMessageContext.setMq(messageQueue);
                                    consumeMessageContext.setMsgList(msgs);
                                    consumeMessageContext.setSuccess(false);
                                    // init the consume context type
                                    consumeMessageContext.setProps(new HashMap<String, String>());
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                                }
    
                                long beginTimestamp = System.currentTimeMillis();
                                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                                boolean hasException = false;
                                try {
         
                                    //  rebalance 
                                    this.processQueue.getLockConsume().lock();
                                    if (this.processQueue.isDropped()) {
         
                                        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                            this.messageQueue);
                                        break;
                                    }
                                    //  
                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                } catch (Throwable e) {
         
                                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                        RemotingHelper.exceptionSimpleDesc(e),
                                        ConsumeMessageOrderlyService.this.consumerGroup,
                                        msgs,
                                        messageQueue);
                                    hasException = true;
                                } finally {
         
                                    this.processQueue.getLockConsume().unlock();
                                }
    
                                if (null == status
                                    || ConsumeOrderlyStatus.ROLLBACK == status
                                    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
         
                                    log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                        ConsumeMessageOrderlyService.this.consumerGroup,
                                        msgs,
                                        messageQueue);
                                }
    
                                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                                if (null == status) {
         
                                    if (hasException) {
         
                                        returnType = ConsumeReturnType.EXCEPTION;
                                    } else {
         
                                        returnType = ConsumeReturnType.RETURNNULL;
                                    }
                                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
         
                                    returnType = ConsumeReturnType.TIME_OUT;
                                } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
         
                                    returnType = ConsumeReturnType.FAILED;
                                } else if (ConsumeOrderlyStatus.SUCCESS == status) {
         
                                    returnType = ConsumeReturnType.SUCCESS;
                                }
    
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                                }
    
                                if (null == status) {
         
                                    status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                                }
    
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                                    consumeMessageContext.setStatus(status.toString());
                                    consumeMessageContext
                                        .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                                }
    
                                ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                    .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                                //  
                                continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                            } else {
         
                                continueConsume = false;
                            }
                        }
                    } else {
         
                        if (this.processQueue.isDropped()) {
         
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            return;
                        }
    
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                    }
                }
            }
    

    ConsumeMessageConcurrentlyService

    public void start() {
         
            this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
         
    
                @Override
                public void run() {
         
                    cleanExpireMsg();
                }
    
            }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
        }
    

    要求メソッドの発行

    public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {
         
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            
            if (msgs.size() <= consumeBatchSize) {
         
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                try {
         
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
         
                    this.submitConsumeRequestLater(consumeRequest);
                }
            } else {
         
                //  , 
                for (int total = 0; total < msgs.size(); ) {
         
                    List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
         
                        if (total < msgs.size()) {
         
                            msgThis.add(msgs.get(total));
                        } else {
         
                            break;
                        }
                    }
    
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    try {
         
                        this.consumeExecutor.submit(consumeRequest);
                    } catch (RejectedExecutionException e) {
         
                        for (; total < msgs.size(); total++) {
         
                            msgThis.add(msgs.get(total));
                        }
    
                        this.submitConsumeRequestLater(consumeRequest);
                    }
                }
            }
        }
    

    プロセスの処理

    public void run() {
         
                if (this.processQueue.isDropped()) {
         
                    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                    return;
                }
    
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
                ConsumeConcurrentlyStatus status = null;
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
    
                ConsumeMessageContext consumeMessageContext = null;
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap<String, String>());
                    consumeMessageContext.setMq(messageQueue);
                    consumeMessageContext.setMsgList(msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
         
                    if (msgs != null && !msgs.isEmpty()) {
         
                        for (MessageExt msg : msgs) {
         
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    //  
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
         
                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    hasException = true;
                }
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
         
                    if (hasException) {
         
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
         
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
         
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
         
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
         
                    returnType = ConsumeReturnType.SUCCESS;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                }
    
                if (null == status) {
         
                    log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
         
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }
    
                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
    
                if (!processQueue.isDropped()) {
         
                    //  
                    //  broker,topic retry , 16 DLQ
                    //  offset
                    //  
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
         
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }