RocketMQ:メッセージプルとメッセージキュー負荷分散メカニズム

69493 ワード

一、前言
Consumerメッセージ消費プロセスは複雑で、モジュールには、メッセージ検索、負荷等化、メッセージフィルタリング、メッセージ処理、再送確認、メッセージ進捗維持などが含まれています.紙面に限られ、本編では主にConsumer起動プロセスとメッセージ抽出実現メカニズムを紹介する.
メッセージ消費はグループのモードで展開され、1つの消費グループ内に複数の消費者を含むことができ、各消費グループは複数のTopicを購読することができ、消費グループの間にはクラスタモードと放送モードの2つの消費モードがある.
  • クラスタモード:Topicの下の同じメッセージは、そのうちの1つの消費者によってのみ消費されることを許可する.
  • ブロードキャストモード:Topicの下の同じメッセージはクラスタ内のすべての消費者に一度消費される.

  • メッセージングサーバと消費者間のメッセージングにも2つの方法があります.
  • プルモード:消費者側が自発的にプルメッセージ要求を開始する.
  • プッシュモード:メッセージがメッセージサーバに到着した後、メッセージ消費者にプッシュする.

  • なお、RocketMQメッセージプッシュモードの実現は、プルモードに基づいて、プルモードに1層包装され、1つのプルタスクが完了した後、次のプルタスクが開始される.
            
    二、問題
    Consumerの起動時にNamesrvアドレス、購読したTopic、メッセージリスナーを指定する必要があることは、公式サンプルを参照してください.
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
             
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
            
            // Subscribe one more more topics to consume.
            consumer.subscribe("TopicTest", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //Launch the consumer instance.
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

    では、Consumerはどのように起動し、Brokerからメッセージ消費を引き出すのか.
    では、クラスタモードでは、複数の消費者がメッセージキューをどのように負荷するのでしょうか.メッセージキュー負荷メカニズムは、1つのメッセージキューが同じ時間に1つの消費者によって消費されることを許可し、1つの消費者が複数のメッセージキューを消費することができるという共通の考え方に従う.
            
    三、消費者起動プロセス
    メッセージの消費と取得方法は2種類ありますが、ここでは日常開発でよく使われる方法:クラスタ消費、プッシュパターンについて分析し、D e f a u l t M QpushConsumerImpl#startメソッドのソースコードに入りましょう.
    org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                //          ...
                
                //1.   topic     
                this.copySubscription();
                
                //2.   MQClientInstance  、   
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                
                //3           
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
              	this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                
                this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                
                //4.        
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                
                //5.         ,  、         
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
    
    			//6.   Consumer
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                
                //7.   MQClientInstance
                mQClientFactory.start();
                this.serviceState = ServiceState.RUNNING;
                break;
        }
       	//     , NameServer  topic    
        this.updateTopicSubscribeInfoWhenSubscriptionChanged(); 
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }
    

    1.Topicの購読メッセージを構築し、Consumer起動前に設定したtopicテーマ名とフィルタ条件をカプセル化し、RebalanceImplに割り当て、後続の負荷均衡配分に遍歴条件を提供する.
    2、MQClientインスタンスを作成し、いくつかのサービスインスタンスを初期化し、後続のメッセージ負荷分配、メッセージ引き抜きをサポートする.
    3、Consuemrロードバランシングの例、割り当てポリシーを初期化し、ロードバランシングサービスによって定期的に呼び出し、消費キューを再割り当てする.
    4、Consumemr消費進捗ストレージ、ブロードキャストモードは、ローカルクライアントで消費進捗を維持する.クラスタモードは、Brokerから消費の進捗を取得し、ローカル消費が成功すると、Brokerに進捗を更新します.
    5、Comsuemrメッセージ消費サービス、メッセージ抽出サービスとのデカップリング、内部独立スレッドプール、抽出メッセージに対する非同期消費:成功、Brokerに消費の進度を更新する;失敗し、ブロードカーにメッセージを戻し、消費を遅らせる.
    6、Consumemr登録リスト.
    7、MQClientインスタンスを起動し、メッセージ引き出しサービス、メッセージ負荷分配サービスを起動し、Topicルーティング更新、Consumer消費進捗持続化などの定期的な実現などを起動する.
    具体的なソース分析を行います.
    org.apache.rocketmq.client.impl.factory.MQClientInstanceコードフラグメント
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    //          ...
                    
                    //    
                    this.mQClientAPIImpl.start(); 
                    
                    //      ,  namesrc     
                    this.startScheduledTask(); 
                    
                    //        
                    this.pullMessageService.start(); 
                    
                    //        ,        
                    this.rebalanceService.start();
                    
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    this.serviceState = ServiceState.RUNNING;
                    break;
            }
        }
    }
    

    クラスタモードでは、同じ消費グループ内に複数のメッセージ消費者があり、同じTopicに複数の消費キューが存在する場合、消費者はどのようにメッセージキュー負荷の均衡を行うのか.
    メッセージ・キューのロード・バランシングは、通常、1つの消費キューが同じ時間に1つのメッセージ・コンシューマによって消費されることを許可し、1つのメッセージ・コンシューマが複数のメッセージ・キューを同時に消費できるようにする方法ですが、RocketMQはどのように実現されますか?
    上記の問題を踏まえて,RocketMQメッセージの消費メカニズムを検討し始めた.MQClientInstanceの起動プロセスから、RocketMQはメッセージのプルメッセージングを担当するために別のスレッドサービスPullMessageServiceを使用していることがわかります.
    org.apache.rocketmq.client.impl.consumer.PullMessageServicerunコードフラグメント
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take(); //       ,    
                this.pullMessage(pullRequest); //        
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }
    

    PullMessageServiceメッセージはサービススレッドを引き、pullRequestQueueキューからPullRequestメッセージ引きタスクを取得し、pullRequestQueueが空の場合、スレッドは引きタスクが挿入されるまでブロックされ、pullMessageメソッドを呼び出してメッセージ引きを行います.
    では、PullRequestはいつキューに入れられたのでしょうか.
    org.apache.rocketmq.client.impl.consumer.PullMessageServiceコードフラグメント
    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest); //          ,     
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }
    
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }
    

    追跡により、PullRequestは2つの場所で作成されます.
    1、RocketMQはPullRequest引取タスクに従ってメッセージ引取を1回実行した後、PullRequestオブジェクトをpullRequestQueueに入れる.
    org.apache.rocketmq.client.impl.consumer.DefaultMQPShConsumerImplコードフラグメント
    public void pullMessage(final PullRequest pullRequest) {
        //      ...
        
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    //        
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            //          ,        ,   pullRequest      pullRequestQueue
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                     			DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( 
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
    							//         ,      pullRequestQueue,             
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            //       ...
                            break;
                        //         ...
                    }
                }
            }
    
            @Override
            public void onException(Throwable e) {
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };
        //      ...
        try {
            this.pullAPIWrapper.pullKernelImpl( //    
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback //    ,    
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }
    

    2、RebalanceServiceサービススレッドによって登録された消費者がTopicを購読した情報をポーリングし、RebalanceImplで作成する.
    org.apache.rocketmq.client.impl.consumer.RebalanceService
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance(); //    
        }
        log.info(this.getServiceName() + " service end");
    }
    

    RebalanceServiceスレッドは、デフォルトで20 sごとにmqClientFactoryを実行します.doRevalance()メソッド
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalanceコードクリップ
    public void doRebalance(final boolean isOrder) {
    	//      、  ,   DefaultMQPushConsumerImpl     ,     
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder); //  Topic    ,      
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
        this.truncateMessageQueueNotMyTopic(); //      
    }
    
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case CLUSTERING: {
            	//  Topic       , Comsumer      org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask       
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    			
    			//      Broker                    ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
    				//  ,              ,                  
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
    
    				//    ,    AllocateMessageQueueAveragely       ,
    				//  :8     ,3    , c1:q1、q2、q3,c2:q4、q5、q6,c3:q7、q8
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
    				
    				//               ,      
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
        }
    }
    
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }
    

    これで,プルメッセージ要求はキューに入れられ,PullMessageServiceサービスによって消費される.
    参照
    『RocketMQテクノロジーの内幕』